Skip to main content

active_call/call/
active_call.rs

1use super::Command;
2use crate::{
3    CallOption, ReferOption,
4    event::{EventReceiver, EventSender, SessionEvent},
5    media::{
6        TrackId,
7        ambiance::AmbianceProcessor,
8        engine::StreamEngine,
9        negotiate::strip_ipv6_candidates,
10        processor::SubscribeProcessor,
11        recorder::RecorderOption,
12        stream::{MediaStream, MediaStreamBuilder},
13        track::{
14            Track, TrackConfig,
15            file::FileTrack,
16            media_pass::MediaPassTrack,
17            rtc::{RtcTrack, RtcTrackConfig},
18            tts::SynthesisHandle,
19            websocket::{WebsocketBytesReceiver, WebsocketTrack},
20        },
21    },
22    synthesis::{SynthesisCommand, SynthesisOption},
23};
24use crate::{
25    app::AppState,
26    call::{
27        CommandReceiver, CommandSender,
28        sip::{DialogStateReceiverGuard, Invitation, InviteDialogStates},
29    },
30    callrecord::{CallRecord, CallRecordEvent, CallRecordEventType, CallRecordHangupReason},
31    useragent::invitation::PendingDialog,
32};
33use anyhow::Result;
34use audio_codec::CodecType;
35use chrono::{DateTime, Utc};
36use rsipstack::dialog::{invitation::InviteOption, server_dialog::ServerInviteDialog};
37use serde::{Deserialize, Serialize};
38use std::{collections::HashMap, path::Path, sync::Arc, time::Duration};
39use tokio::{fs::File, select, sync::Mutex, sync::RwLock, time::sleep};
40use tokio_util::sync::CancellationToken;
41use tracing::{debug, info, warn};
42
43#[cfg(test)]
44mod tests {
45    use super::*;
46    use crate::app::AppStateBuilder;
47    use crate::callrecord::CallRecordHangupReason;
48    use crate::config::Config;
49    use crate::media::track::tts::SynthesisHandle;
50    use crate::synthesis::SynthesisCommand;
51    use tokio::sync::mpsc;
52
53    #[tokio::test]
54    async fn test_tts_ssrc_reuse_for_autohangup() -> Result<()> {
55        let mut config = Config::default();
56        config.udp_port = 0; // Use random port
57        config.media_cache_path = "/tmp/mediacache".to_string();
58        let stream_engine = Arc::new(StreamEngine::default());
59        let app_state = AppStateBuilder::new()
60            .with_config(config)
61            .with_stream_engine(stream_engine)
62            .build()
63            .await?;
64
65        let cancel_token = CancellationToken::new();
66        let session_id = "test-session".to_string();
67        let track_config = TrackConfig::default();
68
69        let mut option = crate::CallOption::default();
70        option.tts = Some(crate::synthesis::SynthesisOption::default());
71
72        let active_call = Arc::new(ActiveCall::new(
73            ActiveCallType::Sip,
74            cancel_token.clone(),
75            session_id.clone(),
76            app_state.invitation.clone(),
77            app_state.clone(),
78            track_config,
79            None,
80            false,
81            None,
82            None,
83        ));
84
85        {
86            let mut state = active_call.call_state.write().await;
87            state.option = Some(option);
88        }
89
90        let (tx, mut rx) = mpsc::unbounded_channel::<SynthesisCommand>();
91        let initial_ssrc = 12345;
92        let handle = SynthesisHandle::new(tx, Some("play_1".to_string()), initial_ssrc);
93
94        // 1. Set initial TTS handle
95        {
96            let mut state = active_call.call_state.write().await;
97            state.tts_handle = Some(handle);
98            state.current_play_id = Some("play_1".to_string());
99        }
100
101        // 2. Call do_tts with auto_hangup=true and same play_id
102        active_call
103            .do_tts(
104                "hangup now".to_string(),
105                None,
106                Some("play_1".to_string()),
107                Some(true),
108                false,
109                true,
110                None,
111                None,
112                false,
113                None,
114            )
115            .await?;
116
117        // 3. Verify auto_hangup state uses the EXISTING SSRC
118        {
119            let state = active_call.call_state.read().await;
120            assert!(state.auto_hangup.is_some());
121            let (h_ssrc, reason) = state.auto_hangup.clone().unwrap();
122            assert_eq!(
123                h_ssrc, initial_ssrc,
124                "SSRC should be reused from existing handle"
125            );
126            assert_eq!(reason, CallRecordHangupReason::BySystem);
127        }
128
129        // 4. Verify command was sent to existing channel
130        let cmd = rx.try_recv().expect("Should have received tts command");
131        assert_eq!(cmd.text, "hangup now");
132
133        Ok(())
134    }
135
136    #[tokio::test]
137    async fn test_tts_new_ssrc_for_different_play_id() -> Result<()> {
138        let mut config = Config::default();
139        config.udp_port = 0; // Use random port
140        config.media_cache_path = "/tmp/mediacache".to_string();
141        let stream_engine = Arc::new(StreamEngine::default());
142        let app_state = AppStateBuilder::new()
143            .with_config(config)
144            .with_stream_engine(stream_engine)
145            .build()
146            .await?;
147
148        let active_call = Arc::new(ActiveCall::new(
149            ActiveCallType::Sip,
150            CancellationToken::new(),
151            "test-session".to_string(),
152            app_state.invitation.clone(),
153            app_state.clone(),
154            TrackConfig::default(),
155            None,
156            false,
157            None,
158            None,
159        ));
160
161        let mut tts_opt = crate::synthesis::SynthesisOption::default();
162        tts_opt.provider = Some(crate::synthesis::SynthesisType::Aliyun);
163        let mut option = crate::CallOption::default();
164        option.tts = Some(tts_opt);
165        {
166            let mut state = active_call.call_state.write().await;
167            state.option = Some(option);
168        }
169
170        let (tx, _rx) = mpsc::unbounded_channel();
171        let initial_ssrc = 111;
172        let handle = SynthesisHandle::new(tx, Some("play_1".to_string()), initial_ssrc);
173
174        {
175            let mut state = active_call.call_state.write().await;
176            state.tts_handle = Some(handle);
177            state.current_play_id = Some("play_1".to_string());
178        }
179
180        // Call do_tts with DIFFERENT play_id
181        active_call
182            .do_tts(
183                "new play".to_string(),
184                None,
185                Some("play_2".to_string()),
186                Some(true),
187                false,
188                true,
189                None,
190                None,
191                false,
192                None,
193            )
194            .await?;
195
196        // Verify auto_hangup uses a NEW SSRC (because it should interrupt and start fresh)
197        {
198            let state = active_call.call_state.read().await;
199            let (h_ssrc, _) = state.auto_hangup.clone().unwrap();
200            assert_ne!(
201                h_ssrc, initial_ssrc,
202                "Should use a new SSRC for different play_id"
203            );
204        }
205
206        Ok(())
207    }
208}
209
210#[derive(Deserialize)]
211#[serde(rename_all = "camelCase")]
212pub struct CallParams {
213    pub id: Option<String>,
214    #[serde(rename = "dump")]
215    pub dump_events: Option<bool>,
216    #[serde(rename = "ping")]
217    pub ping_interval: Option<u32>,
218    pub server_side_track: Option<String>,
219}
220
221#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
222#[serde(rename_all = "camelCase")]
223pub enum ActiveCallType {
224    Webrtc,
225    B2bua,
226    WebSocket,
227    #[default]
228    Sip,
229}
230
231#[derive(Default)]
232pub struct ActiveCallState {
233    pub session_id: String,
234    pub start_time: DateTime<Utc>,
235    pub ring_time: Option<DateTime<Utc>>,
236    pub answer_time: Option<DateTime<Utc>>,
237    pub hangup_reason: Option<CallRecordHangupReason>,
238    pub last_status_code: u16,
239    pub option: Option<CallOption>,
240    pub answer: Option<String>,
241    pub ssrc: u32,
242    pub refer_callstate: Option<ActiveCallStateRef>,
243    pub extras: Option<HashMap<String, serde_json::Value>>,
244    pub is_refer: bool,
245
246    // Runtime state (migrated from ActiveCall to reduce multiple locks)
247    pub tts_handle: Option<SynthesisHandle>,
248    pub auto_hangup: Option<(u32, CallRecordHangupReason)>,
249    pub wait_input_timeout: Option<u32>,
250    pub moh: Option<String>,
251    pub current_play_id: Option<String>,
252    pub audio_receiver: Option<WebsocketBytesReceiver>,
253    pub ready_to_answer: Option<(String, Option<Box<dyn Track>>, ServerInviteDialog)>,
254}
255
256pub type ActiveCallRef = Arc<ActiveCall>;
257pub type ActiveCallStateRef = Arc<RwLock<ActiveCallState>>;
258
259pub struct ActiveCall {
260    pub call_state: ActiveCallStateRef,
261    pub cancel_token: CancellationToken,
262    pub call_type: ActiveCallType,
263    pub session_id: String,
264    pub media_stream: Arc<MediaStream>,
265    pub track_config: TrackConfig,
266    pub event_sender: EventSender,
267    pub app_state: AppState,
268    pub invitation: Invitation,
269    pub cmd_sender: CommandSender,
270    pub dump_events: bool,
271    pub server_side_track_id: TrackId,
272}
273
274pub struct ActiveCallGuard {
275    pub call: ActiveCallRef,
276    pub active_calls: usize,
277}
278
279impl ActiveCallGuard {
280    pub fn new(call: ActiveCallRef) -> Self {
281        let active_calls = {
282            call.app_state
283                .total_calls
284                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
285            let mut calls = call.app_state.active_calls.lock().unwrap();
286            calls.insert(call.session_id.clone(), call.clone());
287            calls.len()
288        };
289        Self { call, active_calls }
290    }
291}
292
293impl Drop for ActiveCallGuard {
294    fn drop(&mut self) {
295        self.call
296            .app_state
297            .active_calls
298            .lock()
299            .unwrap()
300            .remove(&self.call.session_id);
301    }
302}
303
304pub struct ActiveCallReceiver {
305    pub cmd_receiver: CommandReceiver,
306    pub dump_cmd_receiver: CommandReceiver,
307    pub dump_event_receiver: EventReceiver,
308}
309
310impl ActiveCall {
311    pub fn new(
312        call_type: ActiveCallType,
313        cancel_token: CancellationToken,
314        session_id: String,
315        invitation: Invitation,
316        app_state: AppState,
317        track_config: TrackConfig,
318        audio_receiver: Option<WebsocketBytesReceiver>,
319        dump_events: bool,
320        server_side_track_id: Option<TrackId>,
321        extras: Option<HashMap<String, serde_json::Value>>,
322    ) -> Self {
323        let event_sender = crate::event::create_event_sender();
324        let cmd_sender = tokio::sync::broadcast::Sender::<Command>::new(32);
325        let media_stream_builder = MediaStreamBuilder::new(event_sender.clone())
326            .with_id(session_id.clone())
327            .with_cancel_token(cancel_token.child_token());
328        let media_stream = Arc::new(media_stream_builder.build());
329        let call_state = Arc::new(RwLock::new(ActiveCallState {
330            session_id: session_id.clone(),
331            start_time: Utc::now(),
332            ssrc: rand::random::<u32>(),
333            extras,
334            audio_receiver,
335            ..Default::default()
336        }));
337        Self {
338            cancel_token,
339            call_type,
340            session_id,
341            call_state,
342            media_stream,
343            track_config,
344            event_sender,
345            app_state,
346            invitation,
347            cmd_sender,
348            dump_events,
349            server_side_track_id: server_side_track_id.unwrap_or("server-side-track".to_string()),
350        }
351    }
352
353    pub async fn enqueue_command(&self, command: Command) -> Result<()> {
354        self.cmd_sender
355            .send(command)
356            .map_err(|e| anyhow::anyhow!("Failed to send command: {}", e))?;
357        Ok(())
358    }
359
360    /// Create a new ActiveCallReceiver for this ActiveCall
361    /// `tokio::sync::broadcast` not cached messages, so need to early create receiver
362    /// before calling `serve()`
363    pub fn new_receiver(&self) -> ActiveCallReceiver {
364        ActiveCallReceiver {
365            cmd_receiver: self.cmd_sender.subscribe(),
366            dump_cmd_receiver: self.cmd_sender.subscribe(),
367            dump_event_receiver: self.event_sender.subscribe(),
368        }
369    }
370
371    pub async fn serve(&self, receiver: ActiveCallReceiver) -> Result<()> {
372        let ActiveCallReceiver {
373            mut cmd_receiver,
374            dump_cmd_receiver,
375            dump_event_receiver,
376        } = receiver;
377
378        let process_command_loop = async move {
379            while let Ok(command) = cmd_receiver.recv().await {
380                match self.dispatch(command).await {
381                    Ok(_) => (),
382                    Err(e) => {
383                        warn!(session_id = self.session_id, "{}", e);
384                        self.event_sender
385                            .send(SessionEvent::Error {
386                                track_id: self.session_id.clone(),
387                                timestamp: crate::media::get_timestamp(),
388                                sender: "command".to_string(),
389                                error: e.to_string(),
390                                code: None,
391                            })
392                            .ok();
393                    }
394                }
395            }
396        };
397        self.app_state
398            .total_calls
399            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
400
401        tokio::join!(
402            self.dump_loop(self.dump_events, dump_cmd_receiver, dump_event_receiver),
403            async {
404                select! {
405                    _ = process_command_loop => {
406                        info!(session_id = self.session_id, "command loop done");
407                    }
408                    _ = self.process() => {
409                        info!(session_id = self.session_id, "call serve done");
410                    }
411                    _ = self.cancel_token.cancelled() => {
412                        info!(session_id = self.session_id, "call cancelled - cleaning up resources");
413                    }
414                }
415                self.cancel_token.cancel();
416            }
417        );
418        Ok(())
419    }
420
421    async fn process(&self) -> Result<()> {
422        let mut event_receiver = self.event_sender.subscribe();
423
424        let input_timeout_expire = Arc::new(Mutex::new((0u64, 0u32)));
425        let input_timeout_expire_ref = input_timeout_expire.clone();
426        let event_sender = self.event_sender.clone();
427        let wait_input_timeout_loop = async {
428            loop {
429                let (start_time, expire) = { *input_timeout_expire.lock().await };
430                if expire > 0 && crate::media::get_timestamp() >= start_time + expire as u64 {
431                    info!(session_id = self.session_id, "wait input timeout reached");
432                    *input_timeout_expire.lock().await = (0, 0);
433                    event_sender
434                        .send(SessionEvent::Silence {
435                            track_id: self.server_side_track_id.clone(),
436                            timestamp: crate::media::get_timestamp(),
437                            start_time,
438                            duration: expire as u64,
439                            samples: None,
440                        })
441                        .ok();
442                }
443                sleep(Duration::from_millis(100)).await;
444            }
445        };
446        let server_side_track_id = self.server_side_track_id.clone();
447        let event_hook_loop = async move {
448            while let Ok(event) = event_receiver.recv().await {
449                match event {
450                    SessionEvent::Speaking { .. }
451                    | SessionEvent::Dtmf { .. }
452                    | SessionEvent::AsrDelta { .. }
453                    | SessionEvent::AsrFinal { .. }
454                    | SessionEvent::TrackStart { .. } => {
455                        *input_timeout_expire_ref.lock().await = (0, 0);
456                    }
457                    SessionEvent::TrackEnd {
458                        track_id,
459                        play_id,
460                        ssrc,
461                        ..
462                    } => {
463                        if track_id != server_side_track_id {
464                            continue;
465                        }
466
467                        let (moh_path, auto_hangup, wait_timeout_val) = {
468                            let mut state = self.call_state.write().await;
469                            if play_id != state.current_play_id {
470                                debug!(
471                                    session_id = self.session_id,
472                                    ?play_id,
473                                    current = ?state.current_play_id,
474                                    "ignoring interrupted track end"
475                                );
476                                continue;
477                            }
478                            state.current_play_id = None;
479                            (
480                                state.moh.clone(),
481                                state.auto_hangup.clone(),
482                                state.wait_input_timeout.take(),
483                            )
484                        };
485
486                        if let Some(path) = moh_path {
487                            info!(session_id = self.session_id, "looping moh: {}", path);
488                            let ssrc = rand::random::<u32>();
489                            let file_track = FileTrack::new(self.server_side_track_id.clone())
490                                .with_play_id(Some(path.clone()))
491                                .with_ssrc(ssrc)
492                                .with_path(path.clone())
493                                .with_cancel_token(self.cancel_token.child_token());
494                            self.update_track_wrapper(Box::new(file_track), Some(path))
495                                .await;
496                            continue;
497                        }
498
499                        if let Some((hangup_ssrc, hangup_reason)) = auto_hangup {
500                            if hangup_ssrc == ssrc {
501                                info!(
502                                    session_id = self.session_id,
503                                    ssrc, "auto hangup when track end track_id:{}", track_id
504                                );
505                                self.do_hangup(Some(hangup_reason), None).await.ok();
506                            }
507                        }
508
509                        if let Some(timeout) = wait_timeout_val {
510                            let expire = if timeout > 0 {
511                                (crate::media::get_timestamp(), timeout)
512                            } else {
513                                (0, 0)
514                            };
515                            *input_timeout_expire_ref.lock().await = expire;
516                        }
517                    }
518                    SessionEvent::Interrupt { receiver } => {
519                        let track_id =
520                            receiver.unwrap_or_else(|| self.server_side_track_id.clone());
521                        if track_id == self.server_side_track_id {
522                            debug!(
523                                session_id = self.session_id,
524                                "received interrupt event, stopping playback"
525                            );
526                            self.do_interrupt(true).await.ok();
527                        }
528                    }
529                    SessionEvent::Inactivity { track_id, .. } => {
530                        info!(
531                            session_id = self.session_id,
532                            track_id, "inactivity timeout reached, hanging up"
533                        );
534                        self.do_hangup(Some(CallRecordHangupReason::InactivityTimeout), None)
535                            .await
536                            .ok();
537                    }
538                    SessionEvent::Error { track_id, .. } => {
539                        if track_id != server_side_track_id {
540                            continue;
541                        }
542
543                        let moh_info = {
544                            let mut state = self.call_state.write().await;
545                            if let Some(path) = state.moh.clone() {
546                                let fallback = "./config/sounds/refer_moh.wav".to_string();
547                                let next_path = if path != fallback
548                                    && std::path::Path::new(&fallback).exists()
549                                {
550                                    info!(
551                                        session_id = self.session_id,
552                                        "moh error, switching to fallback: {}", fallback
553                                    );
554                                    state.moh = Some(fallback.clone());
555                                    fallback
556                                } else {
557                                    info!(
558                                        session_id = self.session_id,
559                                        "looping moh on error: {}", path
560                                    );
561                                    path
562                                };
563                                Some(next_path)
564                            } else {
565                                None
566                            }
567                        };
568
569                        if let Some(next_path) = moh_info {
570                            let ssrc = rand::random::<u32>();
571                            let file_track = FileTrack::new(self.server_side_track_id.clone())
572                                .with_play_id(Some(next_path.clone()))
573                                .with_ssrc(ssrc)
574                                .with_path(next_path.clone())
575                                .with_cancel_token(self.cancel_token.child_token());
576                            self.update_track_wrapper(Box::new(file_track), Some(next_path))
577                                .await;
578                            continue;
579                        }
580                    }
581                    _ => {}
582                }
583            }
584        };
585
586        select! {
587            _ = wait_input_timeout_loop=>{
588                info!(session_id = self.session_id, "wait input timeout loop done");
589            }
590            _ = self.media_stream.serve() => {
591                info!(session_id = self.session_id, "media stream loop done");
592            }
593            _ = event_hook_loop => {
594                info!(session_id = self.session_id, "event loop done");
595            }
596        }
597        Ok(())
598    }
599
600    async fn dispatch(&self, command: Command) -> Result<()> {
601        match command {
602            Command::Invite { option } => self.do_invite(option).await,
603            Command::Accept { option } => self.do_accept(option).await,
604            Command::Reject { reason, code } => {
605                self.do_reject(code.map(|c| (c as u16).into()), Some(reason))
606                    .await
607            }
608            Command::Ringing {
609                ringtone,
610                recorder,
611                early_media,
612            } => self.do_ringing(ringtone, recorder, early_media).await,
613            Command::Tts {
614                text,
615                speaker,
616                play_id,
617                auto_hangup,
618                streaming,
619                end_of_stream,
620                option,
621                wait_input_timeout,
622                base64,
623                cache_key,
624            } => {
625                self.do_tts(
626                    text,
627                    speaker,
628                    play_id,
629                    auto_hangup,
630                    streaming.unwrap_or_default(),
631                    end_of_stream.unwrap_or_default(),
632                    option,
633                    wait_input_timeout,
634                    base64.unwrap_or_default(),
635                    cache_key,
636                )
637                .await
638            }
639            Command::Play {
640                url,
641                play_id,
642                auto_hangup,
643                wait_input_timeout,
644            } => {
645                self.do_play(url, play_id, auto_hangup, wait_input_timeout)
646                    .await
647            }
648            Command::Hangup { reason, initiator } => {
649                let reason = reason.map(|r| {
650                    r.parse::<CallRecordHangupReason>()
651                        .unwrap_or(CallRecordHangupReason::BySystem)
652                });
653                self.do_hangup(reason, initiator).await
654            }
655            Command::Refer {
656                caller,
657                callee,
658                options,
659            } => self.do_refer(caller, callee, options).await,
660            Command::Mute { track_id } => self.do_mute(track_id).await,
661            Command::Unmute { track_id } => self.do_unmute(track_id).await,
662            Command::Pause {} => self.do_pause().await,
663            Command::Resume {} => self.do_resume().await,
664            Command::Interrupt {
665                graceful: passage,
666                fade_out_ms: _,
667            } => self.do_interrupt(passage.unwrap_or_default()).await,
668            Command::History { speaker, text } => self.do_history(speaker, text).await,
669        }
670    }
671
672    fn build_record_option(&self, option: &CallOption) -> Option<RecorderOption> {
673        if let Some(recorder_option) = &option.recorder {
674            let mut recorder_file = recorder_option.recorder_file.clone();
675            if recorder_file.contains("{id}") {
676                recorder_file = recorder_file.replace("{id}", &self.session_id);
677            }
678
679            let recorder_file = if recorder_file.is_empty() {
680                self.app_state.get_recorder_file(&self.session_id)
681            } else {
682                let p = Path::new(&recorder_file);
683                p.is_absolute()
684                    .then(|| recorder_file.clone())
685                    .unwrap_or_else(|| self.app_state.get_recorder_file(&recorder_file))
686            };
687            info!(
688                session_id = self.session_id,
689                recorder_file, "created recording file"
690            );
691
692            let track_samplerate = self.track_config.samplerate;
693            let recorder_samplerate = if track_samplerate > 0 {
694                track_samplerate
695            } else {
696                recorder_option.samplerate
697            };
698            let recorder_ptime = if recorder_option.ptime == 0 {
699                200
700            } else {
701                recorder_option.ptime
702            };
703            let requested_format = recorder_option
704                .format
705                .unwrap_or(self.app_state.config.recorder_format());
706            let format = requested_format.effective();
707            if requested_format != format {
708                warn!(
709                    session_id = self.session_id,
710                    requested = requested_format.extension(),
711                    "Recorder format fallback to wav due to unsupported feature"
712                );
713            }
714            let mut recorder_config = RecorderOption {
715                recorder_file,
716                samplerate: recorder_samplerate,
717                ptime: recorder_ptime,
718                format: Some(format),
719            };
720            recorder_config.ensure_path_extension(format);
721            Some(recorder_config)
722        } else {
723            None
724        }
725    }
726
727    async fn invite_or_accept(&self, mut option: CallOption, sender: String) -> Result<CallOption> {
728        // Merge with existing configuration (e.g., from playbook)
729        {
730            let state = self.call_state.read().await;
731            option = state.merge_option(option);
732        }
733
734        option.check_default();
735        if let Some(opt) = self.build_record_option(&option) {
736            self.media_stream.update_recorder_option(opt).await;
737        }
738
739        if let Some(opt) = &option.media_pass {
740            let track_id = self.server_side_track_id.clone();
741            let cancel_token = self.cancel_token.child_token();
742            let ssrc = rand::random::<u32>();
743            let media_pass_track = MediaPassTrack::new(
744                self.session_id.clone(),
745                ssrc,
746                track_id,
747                cancel_token,
748                opt.clone(),
749            );
750            self.update_track_wrapper(Box::new(media_pass_track), None)
751                .await;
752        }
753
754        info!(
755            session_id = self.session_id,
756            call_type = ?self.call_type,
757            sender,
758            ?option,
759            "caller with option"
760        );
761
762        match self.setup_caller_track(&option).await {
763            Ok(_) => return Ok(option),
764            Err(e) => {
765                self.app_state
766                    .total_failed_calls
767                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
768                let error_event = crate::event::SessionEvent::Error {
769                    track_id: self.session_id.clone(),
770                    timestamp: crate::media::get_timestamp(),
771                    sender,
772                    error: e.to_string(),
773                    code: None,
774                };
775                self.event_sender.send(error_event).ok();
776                self.do_hangup(Some(CallRecordHangupReason::BySystem), None)
777                    .await
778                    .ok();
779                return Err(e);
780            }
781        }
782    }
783
784    async fn do_invite(&self, option: CallOption) -> Result<()> {
785        self.invite_or_accept(option, "invite".to_string())
786            .await
787            .map(|_| ())
788    }
789
790    async fn do_accept(&self, mut option: CallOption) -> Result<()> {
791        let has_pending = self
792            .invitation
793            .find_dialog_id_by_session_id(&self.session_id)
794            .is_some();
795        let ready_to_answer_val = {
796            let state = self.call_state.read().await;
797            state.ready_to_answer.is_none()
798        };
799
800        if ready_to_answer_val {
801            if !has_pending {
802                // emit reject event
803                warn!(session_id = self.session_id, "no pending call to accept");
804                let rejet_event = crate::event::SessionEvent::Reject {
805                    track_id: self.session_id.clone(),
806                    timestamp: crate::media::get_timestamp(),
807                    reason: "no pending call".to_string(),
808                    refer: None,
809                    code: Some(486),
810                };
811                self.event_sender.send(rejet_event).ok();
812                self.do_hangup(Some(CallRecordHangupReason::BySystem), None)
813                    .await
814                    .ok();
815                return Err(anyhow::anyhow!("no pending call to accept"));
816            }
817            option = self.invite_or_accept(option, "accept".to_string()).await?;
818        } else {
819            option.check_default();
820            self.call_state.write().await.option = Some(option.clone());
821        }
822        info!(session_id = self.session_id, ?option, "accepting call");
823        let ready = self.call_state.write().await.ready_to_answer.take();
824        if let Some((answer, track, dialog)) = ready {
825            info!(
826                session_id = self.session_id,
827                track_id = track.as_ref().map(|t| t.id()),
828                "ready to answer with track"
829            );
830
831            let headers = vec![rsip::Header::ContentType(
832                "application/sdp".to_string().into(),
833            )];
834
835            match dialog.accept(Some(headers), Some(answer.as_bytes().to_vec())) {
836                Ok(_) => {
837                    {
838                        let mut state = self.call_state.write().await;
839                        state.answer = Some(answer);
840                        state.answer_time = Some(Utc::now());
841                    }
842                    self.finish_caller_stack(&option, track).await?;
843                }
844                Err(e) => {
845                    warn!(session_id = self.session_id, "failed to accept call: {}", e);
846                    return Err(anyhow::anyhow!("failed to accept call"));
847                }
848            }
849        }
850        return Ok(());
851    }
852
853    async fn do_reject(
854        &self,
855        code: Option<rsip::StatusCode>,
856        reason: Option<String>,
857    ) -> Result<()> {
858        match self
859            .invitation
860            .find_dialog_id_by_session_id(&self.session_id)
861        {
862            Some(id) => {
863                info!(
864                    session_id = self.session_id,
865                    ?reason,
866                    ?code,
867                    "rejecting call"
868                );
869                self.invitation.hangup(id, code, reason).await
870            }
871            None => Ok(()),
872        }
873    }
874
875    async fn do_ringing(
876        &self,
877        ringtone: Option<String>,
878        recorder: Option<RecorderOption>,
879        early_media: Option<bool>,
880    ) -> Result<()> {
881        let ready_to_answer_val = self.call_state.read().await.ready_to_answer.is_none();
882        if ready_to_answer_val {
883            let option = CallOption {
884                recorder,
885                ..Default::default()
886            };
887            let _ = self.invite_or_accept(option, "ringing".to_string()).await?;
888        }
889
890        let state = self.call_state.read().await;
891        if let Some((answer, _, dialog)) = state.ready_to_answer.as_ref() {
892            let (headers, body) = if early_media.unwrap_or_default() || ringtone.is_some() {
893                let headers = vec![rsip::Header::ContentType(
894                    "application/sdp".to_string().into(),
895                )];
896                (Some(headers), Some(answer.as_bytes().to_vec()))
897            } else {
898                (None, None)
899            };
900
901            dialog.ringing(headers, body).ok();
902            info!(
903                session_id = self.session_id,
904                ringtone, early_media, "playing ringtone"
905            );
906            if let Some(ringtone_url) = ringtone {
907                drop(state);
908                self.do_play(ringtone_url, None, None, None).await.ok();
909            } else {
910                info!(session_id = self.session_id, "no ringtone to play");
911            }
912        }
913        Ok(())
914    }
915
916    async fn do_tts(
917        &self,
918        text: String,
919        speaker: Option<String>,
920        play_id: Option<String>,
921        auto_hangup: Option<bool>,
922        streaming: bool,
923        end_of_stream: bool,
924        option: Option<SynthesisOption>,
925        wait_input_timeout: Option<u32>,
926        base64: bool,
927        cache_key: Option<String>,
928    ) -> Result<()> {
929        let tts_option = {
930            let call_state = self.call_state.read().await;
931            match call_state.option.clone().unwrap_or_default().tts {
932                Some(opt) => opt.merge_with(option),
933                None => {
934                    if let Some(opt) = option {
935                        opt
936                    } else {
937                        return Err(anyhow::anyhow!("no tts option available"));
938                    }
939                }
940            }
941        };
942        let speaker = match speaker {
943            Some(s) => Some(s),
944            None => tts_option.speaker.clone(),
945        };
946
947        let mut play_command = SynthesisCommand {
948            text,
949            speaker,
950            play_id: play_id.clone(),
951            streaming,
952            end_of_stream,
953            option: tts_option,
954            base64,
955            cache_key,
956        };
957        info!(
958            session_id = self.session_id,
959            provider = ?play_command.option.provider,
960            text = %play_command.text.chars().take(10).collect::<String>(),
961            speaker = play_command.speaker.as_deref(),
962            auto_hangup = auto_hangup.unwrap_or_default(),
963            play_id = play_command.play_id.as_deref(),
964            streaming = play_command.streaming,
965            end_of_stream = play_command.end_of_stream,
966            wait_input_timeout = wait_input_timeout.unwrap_or_default(),
967            is_base64 = play_command.base64,
968            cache_key = play_command.cache_key.as_deref(),
969            "new synthesis"
970        );
971
972        let ssrc = rand::random::<u32>();
973        let (should_interrupt, picked_ssrc) = {
974            let mut state = self.call_state.write().await;
975
976            let (target_ssrc, changed) = if let Some(handle) = &state.tts_handle {
977                if play_id.is_some() && state.current_play_id != play_id {
978                    (ssrc, true)
979                } else {
980                    (handle.ssrc, false)
981                }
982            } else {
983                (ssrc, false)
984            };
985
986            state.auto_hangup = match auto_hangup {
987                Some(true) => Some((target_ssrc, CallRecordHangupReason::BySystem)),
988                _ => state.auto_hangup.clone(),
989            };
990            state.wait_input_timeout = wait_input_timeout;
991
992            state.current_play_id = play_id.clone();
993            (changed, target_ssrc)
994        };
995
996        if should_interrupt {
997            let _ = self.do_interrupt(false).await;
998        }
999
1000        let existing_handle = self.call_state.read().await.tts_handle.clone();
1001        if let Some(tts_handle) = existing_handle {
1002            match tts_handle.try_send(play_command) {
1003                Ok(_) => return Ok(()),
1004                Err(e) => {
1005                    play_command = e.0;
1006                }
1007            }
1008        }
1009
1010        let (new_handle, tts_track) = StreamEngine::create_tts_track(
1011            self.app_state.stream_engine.clone(),
1012            self.cancel_token.child_token(),
1013            self.session_id.clone(),
1014            self.server_side_track_id.clone(),
1015            picked_ssrc,
1016            play_id.clone(),
1017            streaming,
1018            &play_command.option,
1019        )
1020        .await?;
1021
1022        new_handle.try_send(play_command)?;
1023        self.call_state.write().await.tts_handle = Some(new_handle);
1024        self.update_track_wrapper(tts_track, play_id).await;
1025        Ok(())
1026    }
1027
1028    async fn do_play(
1029        &self,
1030        url: String,
1031        play_id: Option<String>,
1032        auto_hangup: Option<bool>,
1033        wait_input_timeout: Option<u32>,
1034    ) -> Result<()> {
1035        let ssrc = rand::random::<u32>();
1036        info!(
1037            session_id = self.session_id,
1038            ssrc, url, play_id, auto_hangup, "play file track"
1039        );
1040
1041        let play_id = play_id.or(Some(url.clone()));
1042
1043        let file_track = FileTrack::new(self.server_side_track_id.clone())
1044            .with_play_id(play_id.clone())
1045            .with_ssrc(ssrc)
1046            .with_path(url)
1047            .with_cancel_token(self.cancel_token.child_token());
1048
1049        {
1050            let mut state = self.call_state.write().await;
1051            state.tts_handle = None;
1052            state.auto_hangup = match auto_hangup {
1053                Some(true) => Some((ssrc, CallRecordHangupReason::BySystem)),
1054                _ => None,
1055            };
1056            state.wait_input_timeout = wait_input_timeout;
1057        }
1058
1059        self.update_track_wrapper(Box::new(file_track), play_id)
1060            .await;
1061        Ok(())
1062    }
1063
1064    async fn do_history(&self, speaker: String, text: String) -> Result<()> {
1065        self.event_sender
1066            .send(SessionEvent::AddHistory {
1067                sender: Some(self.session_id.clone()),
1068                timestamp: crate::media::get_timestamp(),
1069                speaker,
1070                text,
1071            })
1072            .map(|_| ())
1073            .map_err(Into::into)
1074    }
1075
1076    async fn do_interrupt(&self, graceful: bool) -> Result<()> {
1077        {
1078            let mut state = self.call_state.write().await;
1079            state.tts_handle = None;
1080            state.moh = None;
1081        }
1082        self.media_stream
1083            .remove_track(&self.server_side_track_id, graceful)
1084            .await;
1085        Ok(())
1086    }
1087    async fn do_pause(&self) -> Result<()> {
1088        Ok(())
1089    }
1090    async fn do_resume(&self) -> Result<()> {
1091        Ok(())
1092    }
1093    async fn do_hangup(
1094        &self,
1095        reason: Option<CallRecordHangupReason>,
1096        initiator: Option<String>,
1097    ) -> Result<()> {
1098        info!(
1099            session_id = self.session_id,
1100            ?reason,
1101            ?initiator,
1102            "do_hangup"
1103        );
1104
1105        // Set hangup reason based on initiator and reason
1106        let hangup_reason = match initiator.as_deref() {
1107            Some("caller") => CallRecordHangupReason::ByCaller,
1108            Some("callee") => CallRecordHangupReason::ByCallee,
1109            Some("system") => CallRecordHangupReason::Autohangup,
1110            _ => reason.unwrap_or(CallRecordHangupReason::BySystem),
1111        };
1112
1113        self.media_stream
1114            .stop(Some(hangup_reason.to_string()), initiator);
1115
1116        self.call_state
1117            .write()
1118            .await
1119            .set_hangup_reason(hangup_reason);
1120        Ok(())
1121    }
1122
1123    async fn do_refer(
1124        &self,
1125        caller: String,
1126        callee: String,
1127        refer_option: Option<ReferOption>,
1128    ) -> Result<()> {
1129        self.do_interrupt(false).await.ok();
1130        let mut moh = refer_option.as_ref().and_then(|o| o.moh.clone());
1131        if let Some(ref path) = moh {
1132            if !path.starts_with("http") && !std::path::Path::new(path).exists() {
1133                let fallback = "./config/sounds/refer_moh.wav";
1134                if std::path::Path::new(fallback).exists() {
1135                    info!(
1136                        session_id = self.session_id,
1137                        "moh {} not found, using fallback {}", path, fallback
1138                    );
1139                    moh = Some(fallback.to_string());
1140                }
1141            }
1142        }
1143        let session_id = self.session_id.clone();
1144        let track_id = self.server_side_track_id.clone();
1145
1146        let recorder = {
1147            let cs = self.call_state.read().await;
1148            cs.option
1149                .as_ref()
1150                .map(|o| o.recorder.clone())
1151                .unwrap_or_default()
1152        };
1153
1154        let call_option = CallOption {
1155            caller: Some(caller),
1156            callee: Some(callee.clone()),
1157            sip: refer_option.as_ref().and_then(|o| o.sip.clone()),
1158            asr: refer_option.as_ref().and_then(|o| o.asr.clone()),
1159            denoise: refer_option.as_ref().and_then(|o| o.denoise.clone()),
1160            recorder,
1161            ..Default::default()
1162        };
1163
1164        let mut invite_option = call_option.build_invite_option()?;
1165        invite_option.call_id = Some(self.session_id.clone());
1166
1167        let headers = invite_option.headers.get_or_insert_with(|| Vec::new());
1168
1169        {
1170            let cs = self.call_state.read().await;
1171            if let Some(opt) = cs.option.as_ref() {
1172                if let Some(callee) = opt.callee.as_ref() {
1173                    headers.push(rsip::Header::Other(
1174                        "X-Referred-To".to_string(),
1175                        callee.clone(),
1176                    ));
1177                }
1178                if let Some(caller) = opt.caller.as_ref() {
1179                    headers.push(rsip::Header::Other(
1180                        "X-Referred-From".to_string(),
1181                        caller.clone(),
1182                    ));
1183                }
1184            }
1185        }
1186
1187        headers.push(rsip::Header::Other(
1188            "X-Referred-Id".to_string(),
1189            self.session_id.clone(),
1190        ));
1191
1192        let ssrc = rand::random::<u32>();
1193        let refer_call_state = Arc::new(RwLock::new(ActiveCallState {
1194            start_time: Utc::now(),
1195            ssrc,
1196            option: Some(call_option.clone()),
1197            is_refer: true,
1198            ..Default::default()
1199        }));
1200
1201        {
1202            let mut cs = self.call_state.write().await;
1203            cs.refer_callstate.replace(refer_call_state.clone());
1204        }
1205
1206        let auto_hangup_requested = refer_option
1207            .as_ref()
1208            .and_then(|o| o.auto_hangup)
1209            .unwrap_or(true);
1210
1211        if auto_hangup_requested {
1212            self.call_state.write().await.auto_hangup =
1213                Some((ssrc, CallRecordHangupReason::ByRefer));
1214        } else {
1215            self.call_state.write().await.auto_hangup = None;
1216        }
1217
1218        let timeout_secs = refer_option.as_ref().and_then(|o| o.timeout).unwrap_or(30);
1219
1220        info!(
1221            session_id = self.session_id,
1222            ssrc,
1223            auto_hangup = auto_hangup_requested,
1224            callee,
1225            timeout_secs,
1226            "do_refer"
1227        );
1228
1229        let r = tokio::time::timeout(
1230            Duration::from_secs(timeout_secs as u64),
1231            self.create_outgoing_sip_track(
1232                self.cancel_token.child_token(),
1233                refer_call_state.clone(),
1234                &track_id,
1235                invite_option,
1236                &call_option,
1237                moh,
1238                auto_hangup_requested,
1239            ),
1240        )
1241        .await;
1242
1243        {
1244            self.call_state.write().await.moh = None;
1245        }
1246
1247        let result = match r {
1248            Ok(res) => res,
1249            Err(_) => {
1250                warn!(
1251                    session_id = session_id,
1252                    "refer sip track creation timed out after {} seconds", timeout_secs
1253                );
1254                self.event_sender
1255                    .send(SessionEvent::Reject {
1256                        track_id,
1257                        timestamp: crate::media::get_timestamp(),
1258                        reason: "Timeout when refer".into(),
1259                        code: Some(408),
1260                        refer: Some(true),
1261                    })
1262                    .ok();
1263                return Err(anyhow::anyhow!("refer sip track creation timed out").into());
1264            }
1265        };
1266
1267        match result {
1268            Ok(answer) => {
1269                self.event_sender
1270                    .send(SessionEvent::Answer {
1271                        timestamp: crate::media::get_timestamp(),
1272                        track_id,
1273                        sdp: answer,
1274                        refer: Some(true),
1275                    })
1276                    .ok();
1277            }
1278            Err(e) => {
1279                warn!(
1280                    session_id = session_id,
1281                    "failed to create refer sip track: {}", e
1282                );
1283                match &e {
1284                    rsipstack::Error::DialogError(reason, _, code) => {
1285                        self.event_sender
1286                            .send(SessionEvent::Reject {
1287                                track_id,
1288                                timestamp: crate::media::get_timestamp(),
1289                                reason: reason.clone(),
1290                                code: Some(code.code() as u32),
1291                                refer: Some(true),
1292                            })
1293                            .ok();
1294                    }
1295                    _ => {}
1296                }
1297                return Err(e.into());
1298            }
1299        }
1300        Ok(())
1301    }
1302
1303    async fn do_mute(&self, track_id: Option<String>) -> Result<()> {
1304        self.media_stream.mute_track(track_id).await;
1305        Ok(())
1306    }
1307
1308    async fn do_unmute(&self, track_id: Option<String>) -> Result<()> {
1309        self.media_stream.unmute_track(track_id).await;
1310        Ok(())
1311    }
1312
1313    pub async fn cleanup(&self) -> Result<()> {
1314        self.call_state.write().await.tts_handle = None;
1315        self.media_stream.cleanup().await.ok();
1316        Ok(())
1317    }
1318
1319    pub fn get_callrecord(&self) -> Option<CallRecord> {
1320        self.call_state.try_read().ok().map(|call_state| {
1321            call_state.build_callrecord(
1322                self.app_state.clone(),
1323                self.session_id.clone(),
1324                self.call_type.clone(),
1325            )
1326        })
1327    }
1328
1329    async fn dump_to_file(
1330        &self,
1331        dump_file: &mut File,
1332        cmd_receiver: &mut CommandReceiver,
1333        event_receiver: &mut EventReceiver,
1334    ) {
1335        loop {
1336            select! {
1337                _ = self.cancel_token.cancelled() => {
1338                    break;
1339                }
1340                Ok(cmd) = cmd_receiver.recv() => {
1341                    CallRecordEvent::write(CallRecordEventType::Command, cmd, dump_file)
1342                        .await;
1343                }
1344                Ok(event) = event_receiver.recv() => {
1345                    if matches!(event, SessionEvent::Binary{..}) {
1346                        continue;
1347                    }
1348                    CallRecordEvent::write(CallRecordEventType::Event, event, dump_file)
1349                        .await;
1350                }
1351            };
1352        }
1353    }
1354
1355    async fn dump_loop(
1356        &self,
1357        dump_events: bool,
1358        mut dump_cmd_receiver: CommandReceiver,
1359        mut dump_event_receiver: EventReceiver,
1360    ) {
1361        if !dump_events {
1362            return;
1363        }
1364
1365        let file_name = self.app_state.get_dump_events_file(&self.session_id);
1366        let mut dump_file = match File::options()
1367            .create(true)
1368            .append(true)
1369            .open(&file_name)
1370            .await
1371        {
1372            Ok(file) => file,
1373            Err(e) => {
1374                warn!(
1375                    session_id = self.session_id,
1376                    file_name, "failed to open dump events file: {}", e
1377                );
1378                return;
1379            }
1380        };
1381        self.dump_to_file(
1382            &mut dump_file,
1383            &mut dump_cmd_receiver,
1384            &mut dump_event_receiver,
1385        )
1386        .await;
1387
1388        while let Ok(event) = dump_event_receiver.try_recv() {
1389            if matches!(event, SessionEvent::Binary { .. }) {
1390                continue;
1391            }
1392            CallRecordEvent::write(CallRecordEventType::Event, event, &mut dump_file).await;
1393        }
1394    }
1395
1396    pub async fn create_rtp_track(&self, track_id: TrackId, ssrc: u32) -> Result<RtcTrack> {
1397        let mut rtc_config = RtcTrackConfig::default();
1398        rtc_config.mode = rustrtc::TransportMode::Rtp;
1399
1400        if let Some(codecs) = &self.app_state.config.codecs {
1401            let mut codec_types = Vec::new();
1402            for c in codecs {
1403                match c.to_lowercase().as_str() {
1404                    "pcmu" => codec_types.push(CodecType::PCMU),
1405                    "pcma" => codec_types.push(CodecType::PCMA),
1406                    "g722" => codec_types.push(CodecType::G722),
1407                    "g729" => codec_types.push(CodecType::G729),
1408                    "opus" => codec_types.push(CodecType::Opus),
1409                    "dtmf" | "2833" | "telephone_event" => {
1410                        codec_types.push(CodecType::TelephoneEvent)
1411                    }
1412                    _ => {}
1413                }
1414            }
1415            if !codec_types.is_empty() {
1416                rtc_config.preferred_codec = Some(codec_types[0].clone());
1417                rtc_config.codecs = codec_types;
1418            }
1419        }
1420
1421        if rtc_config.preferred_codec.is_none() {
1422            rtc_config.preferred_codec = Some(self.track_config.codec.clone());
1423        }
1424
1425        rtc_config.rtp_port_range = self
1426            .app_state
1427            .config
1428            .rtp_start_port
1429            .zip(self.app_state.config.rtp_end_port);
1430
1431        if let Some(ref external_ip) = self.app_state.config.external_ip {
1432            rtc_config.external_ip = Some(external_ip.clone());
1433        }
1434        if let Some(ref bind_ip) = self.app_state.config.rtp_bind_ip {
1435            rtc_config.bind_ip = Some(bind_ip.clone());
1436        }
1437
1438        rtc_config.enable_latching = self.app_state.config.enable_rtp_latching;
1439
1440        let mut track = RtcTrack::new(
1441            self.cancel_token.child_token(),
1442            track_id,
1443            self.track_config.clone(),
1444            rtc_config,
1445        )
1446        .with_ssrc(ssrc);
1447
1448        track.create().await?;
1449
1450        Ok(track)
1451    }
1452
1453    async fn setup_caller_track(&self, option: &CallOption) -> Result<()> {
1454        let hangup_headers = option
1455            .sip
1456            .as_ref()
1457            .and_then(|s| s.hangup_headers.as_ref())
1458            .map(|headers_map| {
1459                headers_map
1460                    .iter()
1461                    .map(|(k, v)| rsip::Header::Other(k.clone(), v.clone()))
1462                    .collect::<Vec<rsip::Header>>()
1463            });
1464        self.call_state.write().await.option = Some(option.clone());
1465        info!(
1466            session_id = self.session_id,
1467            call_type = ?self.call_type,
1468            "setup caller track"
1469        );
1470
1471        let track = match self.call_type {
1472            ActiveCallType::Webrtc => Some(self.create_webrtc_track().await?),
1473            ActiveCallType::WebSocket => {
1474                let audio_receiver = self.call_state.write().await.audio_receiver.take();
1475                if let Some(receiver) = audio_receiver {
1476                    Some(self.create_websocket_track(receiver).await?)
1477                } else {
1478                    None
1479                }
1480            }
1481            ActiveCallType::Sip => {
1482                if let Some(dialog_id) = self
1483                    .invitation
1484                    .find_dialog_id_by_session_id(&self.session_id)
1485                {
1486                    if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1487                        return self
1488                            .prepare_incoming_sip_track(
1489                                self.cancel_token.clone(),
1490                                self.call_state.clone(),
1491                                &self.session_id,
1492                                pending_dialog,
1493                                hangup_headers,
1494                            )
1495                            .await;
1496                    }
1497                }
1498
1499                // Auto-inject credentials from registered users if not already provided
1500                let mut option = option.clone();
1501                if option.sip.is_none()
1502                    || option
1503                        .sip
1504                        .as_ref()
1505                        .and_then(|s| s.username.as_ref())
1506                        .is_none()
1507                {
1508                    if let Some(callee) = &option.callee {
1509                        if let Some(cred) = self.app_state.find_credentials_for_callee(callee) {
1510                            if option.sip.is_none() {
1511                                option.sip = Some(crate::SipOption {
1512                                    username: Some(cred.username.clone()),
1513                                    password: Some(cred.password.clone()),
1514                                    realm: cred.realm.clone(),
1515                                    ..Default::default()
1516                                });
1517                            }
1518                        }
1519                    }
1520                }
1521
1522                let mut invite_option = option.build_invite_option()?;
1523                invite_option.call_id = Some(self.session_id.clone());
1524
1525                match self
1526                    .create_outgoing_sip_track(
1527                        self.cancel_token.clone(),
1528                        self.call_state.clone(),
1529                        &self.session_id,
1530                        invite_option,
1531                        &option,
1532                        None,
1533                        false,
1534                    )
1535                    .await
1536                {
1537                    Ok(answer) => {
1538                        self.event_sender
1539                            .send(SessionEvent::Answer {
1540                                timestamp: crate::media::get_timestamp(),
1541                                track_id: self.session_id.clone(),
1542                                sdp: answer,
1543                                refer: Some(false),
1544                            })
1545                            .ok();
1546                        return Ok(());
1547                    }
1548                    Err(e) => {
1549                        warn!(
1550                            session_id = self.session_id,
1551                            "failed to create sip track: {}", e
1552                        );
1553                        match &e {
1554                            rsipstack::Error::DialogError(reason, _, code) => {
1555                                self.event_sender
1556                                    .send(SessionEvent::Reject {
1557                                        track_id: self.session_id.clone(),
1558                                        timestamp: crate::media::get_timestamp(),
1559                                        reason: reason.clone(),
1560                                        code: Some(code.code() as u32),
1561                                        refer: Some(false),
1562                                    })
1563                                    .ok();
1564                            }
1565                            _ => {}
1566                        }
1567                        return Err(e.into());
1568                    }
1569                }
1570            }
1571            ActiveCallType::B2bua => {
1572                if let Some(dialog_id) = self
1573                    .invitation
1574                    .find_dialog_id_by_session_id(&self.session_id)
1575                {
1576                    if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1577                        return self
1578                            .prepare_incoming_sip_track(
1579                                self.cancel_token.clone(),
1580                                self.call_state.clone(),
1581                                &self.session_id,
1582                                pending_dialog,
1583                                hangup_headers,
1584                            )
1585                            .await;
1586                    }
1587                }
1588
1589                warn!(
1590                    session_id = self.session_id,
1591                    "no pending dialog found for B2BUA call"
1592                );
1593                return Err(anyhow::anyhow!(
1594                    "no pending dialog found for session_id: {}",
1595                    self.session_id
1596                ));
1597            }
1598        };
1599        match track {
1600            Some(track) => {
1601                self.finish_caller_stack(&option, Some(track)).await?;
1602            }
1603            None => {
1604                warn!(session_id = self.session_id, "no track created for caller");
1605                return Err(anyhow::anyhow!("no track created for caller"));
1606            }
1607        }
1608        Ok(())
1609    }
1610
1611    async fn finish_caller_stack(
1612        &self,
1613        option: &CallOption,
1614        track: Option<Box<dyn Track>>,
1615    ) -> Result<()> {
1616        if let Some(track) = track {
1617            self.setup_track_with_stream(&option, track).await?;
1618        }
1619
1620        {
1621            let call_state = self.call_state.read().await;
1622            if let Some(ref answer) = call_state.answer {
1623                info!(
1624                    session_id = self.session_id,
1625                    "sending answer event: {}", answer,
1626                );
1627                self.event_sender
1628                    .send(SessionEvent::Answer {
1629                        timestamp: crate::media::get_timestamp(),
1630                        track_id: self.session_id.clone(),
1631                        sdp: answer.clone(),
1632                        refer: Some(false),
1633                    })
1634                    .ok();
1635            } else {
1636                warn!(
1637                    session_id = self.session_id,
1638                    "no answer in state to send event"
1639                );
1640            }
1641        }
1642        Ok(())
1643    }
1644
1645    pub async fn setup_track_with_stream(
1646        &self,
1647        option: &CallOption,
1648        mut track: Box<dyn Track>,
1649    ) -> Result<()> {
1650        let processors = match StreamEngine::create_processors(
1651            self.app_state.stream_engine.clone(),
1652            track.as_ref(),
1653            self.cancel_token.child_token(),
1654            self.event_sender.clone(),
1655            self.media_stream.packet_sender.clone(),
1656            option,
1657        )
1658        .await
1659        {
1660            Ok(processors) => processors,
1661            Err(e) => {
1662                warn!(
1663                    session_id = self.session_id,
1664                    "failed to prepare stream processors: {}", e
1665                );
1666                vec![]
1667            }
1668        };
1669
1670        // Add all processors from the hook
1671        for processor in processors {
1672            track.append_processor(processor);
1673        }
1674
1675        self.update_track_wrapper(track, None).await;
1676        Ok(())
1677    }
1678
1679    pub async fn update_track_wrapper(&self, mut track: Box<dyn Track>, play_id: Option<String>) {
1680        let (ambiance_opt, subscribe) = {
1681            let state = self.call_state.read().await;
1682            let mut opt = state
1683                .option
1684                .as_ref()
1685                .and_then(|o| o.ambiance.clone())
1686                .unwrap_or_default();
1687
1688            if let Some(global) = &self.app_state.config.ambiance {
1689                opt.merge(global);
1690            }
1691
1692            let subscribe = state
1693                .option
1694                .as_ref()
1695                .and_then(|o| o.subscribe)
1696                .unwrap_or_default();
1697
1698            (opt, subscribe)
1699        };
1700        if track.id() == &self.server_side_track_id && ambiance_opt.path.is_some() {
1701            match AmbianceProcessor::new(ambiance_opt).await {
1702                Ok(ambiance) => {
1703                    info!(session_id = self.session_id, "loaded ambiance processor");
1704                    track.append_processor(Box::new(ambiance));
1705                }
1706                Err(e) => {
1707                    tracing::error!("failed to load ambiance wav {}", e);
1708                }
1709            }
1710        }
1711
1712        if subscribe && self.call_type != ActiveCallType::WebSocket {
1713            let (track_index, sub_track_id) = if track.id() == &self.server_side_track_id {
1714                (0, self.server_side_track_id.clone())
1715            } else {
1716                (1, self.session_id.clone())
1717            };
1718            let sub_processor =
1719                SubscribeProcessor::new(self.event_sender.clone(), sub_track_id, track_index);
1720            track.append_processor(Box::new(sub_processor));
1721        }
1722
1723        self.call_state.write().await.current_play_id = play_id.clone();
1724        self.media_stream.update_track(track, play_id).await;
1725    }
1726
1727    pub async fn create_websocket_track(
1728        &self,
1729        audio_receiver: WebsocketBytesReceiver,
1730    ) -> Result<Box<dyn Track>> {
1731        let (ssrc, codec) = {
1732            let call_state = self.call_state.read().await;
1733            (
1734                call_state.ssrc,
1735                call_state
1736                    .option
1737                    .as_ref()
1738                    .map(|o| o.codec.clone())
1739                    .unwrap_or_default(),
1740            )
1741        };
1742
1743        let ws_track = WebsocketTrack::new(
1744            self.cancel_token.child_token(),
1745            self.session_id.clone(),
1746            self.track_config.clone(),
1747            self.event_sender.clone(),
1748            audio_receiver,
1749            codec,
1750            ssrc,
1751        );
1752
1753        {
1754            let mut call_state = self.call_state.write().await;
1755            call_state.answer_time = Some(Utc::now());
1756            call_state.answer = Some("".to_string());
1757            call_state.last_status_code = 200;
1758        }
1759
1760        Ok(Box::new(ws_track))
1761    }
1762
1763    pub(super) async fn create_webrtc_track(&self) -> Result<Box<dyn Track>> {
1764        let (ssrc, option) = {
1765            let call_state = self.call_state.read().await;
1766            (
1767                call_state.ssrc,
1768                call_state.option.clone().unwrap_or_default(),
1769            )
1770        };
1771
1772        let mut rtc_config = RtcTrackConfig::default();
1773        rtc_config.mode = rustrtc::TransportMode::WebRtc; // WebRTC
1774        rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
1775
1776        if let Some(codecs) = &self.app_state.config.codecs {
1777            let mut codec_types = Vec::new();
1778            for c in codecs {
1779                match c.to_lowercase().as_str() {
1780                    "pcmu" => codec_types.push(CodecType::PCMU),
1781                    "pcma" => codec_types.push(CodecType::PCMA),
1782                    "g722" => codec_types.push(CodecType::G722),
1783                    "g729" => codec_types.push(CodecType::G729),
1784                    "opus" => codec_types.push(CodecType::Opus),
1785                    "dtmf" | "2833" | "telephone_event" => {
1786                        codec_types.push(CodecType::TelephoneEvent)
1787                    }
1788                    _ => {}
1789                }
1790            }
1791            if !codec_types.is_empty() {
1792                rtc_config.preferred_codec = Some(codec_types[0].clone());
1793                rtc_config.codecs = codec_types;
1794            }
1795        }
1796
1797        if let Some(ref external_ip) = self.app_state.config.external_ip {
1798            rtc_config.external_ip = Some(external_ip.clone());
1799        }
1800        if let Some(ref bind_ip) = self.app_state.config.rtp_bind_ip {
1801            rtc_config.bind_ip = Some(bind_ip.clone());
1802        }
1803
1804        let mut webrtc_track = RtcTrack::new(
1805            self.cancel_token.child_token(),
1806            self.session_id.clone(),
1807            self.track_config.clone(),
1808            rtc_config,
1809        )
1810        .with_ssrc(ssrc);
1811
1812        let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
1813        let offer = match option.enable_ipv6 {
1814            Some(false) | None => {
1815                strip_ipv6_candidates(option.offer.as_ref().unwrap_or(&"".to_string()))
1816            }
1817            _ => option.offer.clone().unwrap_or("".to_string()),
1818        };
1819        let answer: Option<String>;
1820        match webrtc_track.handshake(offer, timeout).await {
1821            Ok(answer_sdp) => {
1822                answer = match option.enable_ipv6 {
1823                    Some(false) | None => Some(strip_ipv6_candidates(&answer_sdp)),
1824                    Some(true) => Some(answer_sdp.to_string()),
1825                };
1826            }
1827            Err(e) => {
1828                warn!(session_id = self.session_id, "failed to setup track: {}", e);
1829                return Err(anyhow::anyhow!("Failed to setup track: {}", e));
1830            }
1831        }
1832
1833        {
1834            let mut call_state = self.call_state.write().await;
1835            call_state.answer_time = Some(Utc::now());
1836            call_state.answer = answer;
1837            call_state.last_status_code = 200;
1838        }
1839        Ok(Box::new(webrtc_track))
1840    }
1841
1842    async fn create_outgoing_sip_track(
1843        &self,
1844        cancel_token: CancellationToken,
1845        call_state_ref: ActiveCallStateRef,
1846        track_id: &String,
1847        mut invite_option: InviteOption,
1848        call_option: &CallOption,
1849        moh: Option<String>,
1850        auto_hangup: bool,
1851    ) -> Result<String, rsipstack::Error> {
1852        let ssrc = call_state_ref.read().await.ssrc;
1853        let rtp_track = self
1854            .create_rtp_track(track_id.clone(), ssrc)
1855            .await
1856            .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1857
1858        let offer = Some(
1859            rtp_track
1860                .local_description()
1861                .await
1862                .map_err(|e| rsipstack::Error::Error(e.to_string()))?,
1863        );
1864
1865        {
1866            let mut cs = call_state_ref.write().await;
1867            if let Some(o) = cs.option.as_mut() {
1868                o.offer = offer.clone();
1869            }
1870            cs.start_time = Utc::now();
1871        };
1872
1873        invite_option.offer = offer.clone().map(|s| s.into());
1874
1875        // Set contact to local SIP endpoint address if not already set explicitly
1876        // Check if contact is still default (no scheme set) or if host is localhost-like
1877        let needs_contact = invite_option.contact.scheme.is_none()
1878            || invite_option
1879                .contact
1880                .host_with_port
1881                .to_string()
1882                .starts_with("127.0.0.1");
1883
1884        if needs_contact {
1885            if let Some(addr) = self.invitation.dialog_layer.endpoint.get_addrs().first() {
1886                invite_option.contact = rsip::Uri {
1887                    scheme: Some(rsip::Scheme::Sip),
1888                    auth: None,
1889                    host_with_port: addr.addr.clone(),
1890                    params: vec![],
1891                    headers: vec![],
1892                };
1893            }
1894        }
1895
1896        let mut rtp_track_to_setup = Some(Box::new(rtp_track) as Box<dyn Track>);
1897
1898        if let Some(moh) = moh {
1899            let ssrc_and_moh = {
1900                let mut state = call_state_ref.write().await;
1901                state.moh = Some(moh.clone());
1902                if state.current_play_id.is_none() {
1903                    let ssrc = rand::random::<u32>();
1904                    Some((ssrc, moh.clone()))
1905                } else {
1906                    info!(
1907                        session_id = self.session_id,
1908                        "Something is playing, MOH will start after it ends"
1909                    );
1910                    None
1911                }
1912            };
1913
1914            if let Some((ssrc, moh_path)) = ssrc_and_moh {
1915                let file_track = FileTrack::new(self.server_side_track_id.clone())
1916                    .with_play_id(Some(moh_path.clone()))
1917                    .with_ssrc(ssrc)
1918                    .with_path(moh_path.clone())
1919                    .with_cancel_token(self.cancel_token.child_token());
1920                self.update_track_wrapper(Box::new(file_track), Some(moh_path))
1921                    .await;
1922            }
1923        } else {
1924            let track = rtp_track_to_setup.take().unwrap();
1925            self.setup_track_with_stream(&call_option, track)
1926                .await
1927                .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1928        }
1929
1930        info!(
1931            session_id = self.session_id,
1932            track_id,
1933            contact = %invite_option.contact,
1934            "invite {} -> {} offer: \n{}",
1935            invite_option.caller,
1936            invite_option.callee,
1937            offer.as_ref().map(|s| s.as_str()).unwrap_or("<NO OFFER>")
1938        );
1939
1940        let (dlg_state_sender, dlg_state_receiver) =
1941            self.invitation.dialog_layer.new_dialog_state_channel();
1942
1943        let states = InviteDialogStates {
1944            is_client: true,
1945            session_id: self.session_id.clone(),
1946            track_id: track_id.clone(),
1947            event_sender: self.event_sender.clone(),
1948            media_stream: self.media_stream.clone(),
1949            call_state: call_state_ref.clone(),
1950            cancel_token,
1951            terminated_reason: None,
1952            has_early_media: false,
1953        };
1954
1955        let hangup_headers = call_option
1956            .sip
1957            .as_ref()
1958            .and_then(|s| s.hangup_headers.as_ref())
1959            .map(|headers_map| {
1960                headers_map
1961                    .iter()
1962                    .map(|(k, v)| rsip::Header::Other(k.clone(), v.clone()))
1963                    .collect::<Vec<rsip::Header>>()
1964            });
1965
1966        let mut client_dialog_handler = DialogStateReceiverGuard::new(
1967            self.invitation.dialog_layer.clone(),
1968            dlg_state_receiver,
1969            hangup_headers,
1970        );
1971
1972        crate::spawn(async move {
1973            client_dialog_handler.process_dialog(states).await;
1974        });
1975
1976        let (dialog_id, answer) = self
1977            .invitation
1978            .invite(invite_option, dlg_state_sender)
1979            .await?;
1980
1981        self.call_state.write().await.moh = None;
1982
1983        if let Some(track) = rtp_track_to_setup {
1984            self.setup_track_with_stream(&call_option, track)
1985                .await
1986                .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1987        }
1988
1989        let answer = match answer {
1990            Some(answer) => String::from_utf8_lossy(&answer).to_string(),
1991            None => {
1992                warn!(session_id = self.session_id, "no answer received");
1993                return Err(rsipstack::Error::DialogError(
1994                    "No answer received".to_string(),
1995                    dialog_id,
1996                    rsip::StatusCode::NotAcceptableHere,
1997                ));
1998            }
1999        };
2000
2001        {
2002            let mut cs = call_state_ref.write().await;
2003            if cs.answer.is_none() {
2004                cs.answer = Some(answer.clone());
2005            }
2006            if auto_hangup {
2007                cs.auto_hangup = Some((ssrc, CallRecordHangupReason::ByRefer));
2008            }
2009        }
2010
2011        self.media_stream
2012            .update_remote_description(&track_id, &answer)
2013            .await
2014            .ok();
2015
2016        Ok(answer)
2017    }
2018
2019    /// Detect if SDP is WebRTC format
2020    pub fn is_webrtc_sdp(sdp: &str) -> bool {
2021        (sdp.contains("a=ice-ufrag:") || sdp.contains("a=ice-pwd:"))
2022            && sdp.contains("a=fingerprint:")
2023    }
2024
2025    pub async fn setup_answer_track(
2026        &self,
2027        ssrc: u32,
2028        option: &CallOption,
2029        offer: String,
2030    ) -> Result<(String, Box<dyn Track>)> {
2031        let offer = match option.enable_ipv6 {
2032            Some(false) | None => strip_ipv6_candidates(&offer),
2033            _ => offer.clone(),
2034        };
2035
2036        let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
2037
2038        let mut media_track = if Self::is_webrtc_sdp(&offer) {
2039            let mut rtc_config = RtcTrackConfig::default();
2040            rtc_config.mode = rustrtc::TransportMode::WebRtc;
2041            rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
2042            if let Some(ref external_ip) = self.app_state.config.external_ip {
2043                rtc_config.external_ip = Some(external_ip.clone());
2044            }
2045            if let Some(ref bind_ip) = self.app_state.config.rtp_bind_ip {
2046                rtc_config.bind_ip = Some(bind_ip.clone());
2047            }
2048            rtc_config.enable_latching = self.app_state.config.enable_rtp_latching;
2049
2050            let webrtc_track = RtcTrack::new(
2051                self.cancel_token.child_token(),
2052                self.session_id.clone(),
2053                self.track_config.clone(),
2054                rtc_config,
2055            )
2056            .with_ssrc(ssrc);
2057
2058            Box::new(webrtc_track) as Box<dyn Track>
2059        } else {
2060            let rtp_track = self.create_rtp_track(self.session_id.clone(), ssrc).await?;
2061            Box::new(rtp_track) as Box<dyn Track>
2062        };
2063
2064        let answer = match media_track.handshake(offer.clone(), timeout).await {
2065            Ok(answer) => answer,
2066            Err(e) => {
2067                return Err(anyhow::anyhow!("handshake failed: {e}"));
2068            }
2069        };
2070
2071        return Ok((answer, media_track));
2072    }
2073
2074    pub async fn prepare_incoming_sip_track(
2075        &self,
2076        cancel_token: CancellationToken,
2077        call_state_ref: ActiveCallStateRef,
2078        track_id: &String,
2079        pending_dialog: PendingDialog,
2080        hangup_headers: Option<Vec<rsip::Header>>,
2081    ) -> Result<()> {
2082        let state_receiver = pending_dialog.state_receiver;
2083        //let pending_token_clone = pending_dialog.token;
2084
2085        let states = InviteDialogStates {
2086            is_client: false,
2087            session_id: self.session_id.clone(),
2088            track_id: track_id.clone(),
2089            event_sender: self.event_sender.clone(),
2090            media_stream: self.media_stream.clone(),
2091            call_state: self.call_state.clone(),
2092            cancel_token,
2093            terminated_reason: None,
2094            has_early_media: false,
2095        };
2096
2097        let initial_request = pending_dialog.dialog.initial_request();
2098        let offer = String::from_utf8_lossy(&initial_request.body).to_string();
2099
2100        let (ssrc, option) = {
2101            let call_state = call_state_ref.read().await;
2102            (
2103                call_state.ssrc,
2104                call_state.option.clone().unwrap_or_default(),
2105            )
2106        };
2107
2108        match self.setup_answer_track(ssrc, &option, offer).await {
2109            Ok((offer, track)) => {
2110                self.setup_track_with_stream(&option, track).await?;
2111                {
2112                    let mut state = self.call_state.write().await;
2113                    state.ready_to_answer = Some((offer, None, pending_dialog.dialog));
2114                }
2115            }
2116            Err(e) => {
2117                return Err(anyhow::anyhow!("error creating track: {}", e));
2118            }
2119        }
2120
2121        let mut client_dialog_handler = DialogStateReceiverGuard::new(
2122            self.invitation.dialog_layer.clone(),
2123            state_receiver,
2124            hangup_headers,
2125        );
2126
2127        crate::spawn(async move {
2128            client_dialog_handler.process_dialog(states).await;
2129        });
2130        Ok(())
2131    }
2132}
2133
2134impl Drop for ActiveCall {
2135    fn drop(&mut self) {
2136        info!(session_id = self.session_id, "dropping active call");
2137        if let Some(sender) = self.app_state.callrecord_sender.as_ref() {
2138            if let Some(record) = self.get_callrecord() {
2139                if let Err(e) = sender.send(record) {
2140                    warn!(
2141                        session_id = self.session_id,
2142                        "failed to send call record: {}", e
2143                    );
2144                }
2145            }
2146        }
2147    }
2148}
2149
2150impl ActiveCallState {
2151    pub fn merge_option(&self, mut option: CallOption) -> CallOption {
2152        if let Some(existing) = &self.option {
2153            if option.asr.is_none() {
2154                option.asr = existing.asr.clone();
2155            }
2156            if option.tts.is_none() {
2157                option.tts = existing.tts.clone();
2158            }
2159            if option.vad.is_none() {
2160                option.vad = existing.vad.clone();
2161            }
2162            if option.denoise.is_none() {
2163                option.denoise = existing.denoise;
2164            }
2165            if option.recorder.is_none() {
2166                option.recorder = existing.recorder.clone();
2167            }
2168            if option.eou.is_none() {
2169                option.eou = existing.eou.clone();
2170            }
2171            if option.extra.is_none() {
2172                option.extra = existing.extra.clone();
2173            }
2174            if option.ambiance.is_none() {
2175                option.ambiance = existing.ambiance.clone();
2176            }
2177        }
2178        option
2179    }
2180
2181    pub fn set_hangup_reason(&mut self, reason: CallRecordHangupReason) {
2182        if self.hangup_reason.is_none() {
2183            self.hangup_reason = Some(reason);
2184        }
2185    }
2186
2187    pub fn build_hangup_event(
2188        &self,
2189        track_id: TrackId,
2190        initiator: Option<String>,
2191    ) -> crate::event::SessionEvent {
2192        let from = self.option.as_ref().and_then(|o| o.caller.as_ref());
2193        let to = self.option.as_ref().and_then(|o| o.callee.as_ref());
2194        let extra = self.extras.clone();
2195
2196        crate::event::SessionEvent::Hangup {
2197            track_id,
2198            timestamp: crate::media::get_timestamp(),
2199            reason: Some(format!("{:?}", self.hangup_reason)),
2200            initiator,
2201            start_time: self.start_time.to_rfc3339(),
2202            answer_time: self.answer_time.map(|t| t.to_rfc3339()),
2203            ringing_time: self.ring_time.map(|t| t.to_rfc3339()),
2204            hangup_time: Utc::now().to_rfc3339(),
2205            extra,
2206            from: from.map(|f| f.into()),
2207            to: to.map(|f| f.into()),
2208            refer: Some(self.is_refer),
2209        }
2210    }
2211
2212    pub fn build_callrecord(
2213        &self,
2214        app_state: AppState,
2215        session_id: String,
2216        call_type: ActiveCallType,
2217    ) -> CallRecord {
2218        let option = self.option.clone().unwrap_or_default();
2219        let recorder = if option.recorder.is_some() {
2220            let recorder_file = app_state.get_recorder_file(&session_id);
2221            if std::path::Path::new(&recorder_file).exists() {
2222                let file_size = std::fs::metadata(&recorder_file)
2223                    .map(|m| m.len())
2224                    .unwrap_or(0);
2225                vec![crate::callrecord::CallRecordMedia {
2226                    track_id: session_id.clone(),
2227                    path: recorder_file,
2228                    size: file_size,
2229                    extra: None,
2230                }]
2231            } else {
2232                vec![]
2233            }
2234        } else {
2235            vec![]
2236        };
2237
2238        let dump_event_file = app_state.get_dump_events_file(&session_id);
2239        let dump_event_file = if std::path::Path::new(&dump_event_file).exists() {
2240            Some(dump_event_file)
2241        } else {
2242            None
2243        };
2244
2245        let refer_callrecord = self.refer_callstate.as_ref().and_then(|rc| {
2246            if let Ok(rc) = rc.try_read() {
2247                Some(Box::new(rc.build_callrecord(
2248                    app_state.clone(),
2249                    rc.session_id.clone(),
2250                    ActiveCallType::B2bua,
2251                )))
2252            } else {
2253                None
2254            }
2255        });
2256
2257        let caller = option.caller.clone().unwrap_or_default();
2258        let callee = option.callee.clone().unwrap_or_default();
2259
2260        CallRecord {
2261            option: Some(option),
2262            call_id: session_id,
2263            call_type,
2264            start_time: self.start_time,
2265            ring_time: self.ring_time.clone(),
2266            answer_time: self.answer_time.clone(),
2267            end_time: Utc::now(),
2268            caller,
2269            callee,
2270            hangup_reason: self.hangup_reason.clone(),
2271            hangup_messages: Vec::new(),
2272            status_code: self.last_status_code,
2273            extras: self.extras.clone(),
2274            dump_event_file,
2275            recorder,
2276            refer_callrecord,
2277        }
2278    }
2279}