Skip to main content

active_call/call/
active_call.rs

1use super::Command;
2use crate::{
3    CallOption, ReferOption,
4    event::{EventReceiver, EventSender, SessionEvent},
5    media::{
6        TrackId,
7        ambiance::AmbianceProcessor,
8        engine::StreamEngine,
9        negotiate::strip_ipv6_candidates,
10        processor::SubscribeProcessor,
11        recorder::RecorderOption,
12        stream::{MediaStream, MediaStreamBuilder},
13        track::{
14            Track, TrackConfig,
15            file::FileTrack,
16            media_pass::MediaPassTrack,
17            rtc::{RtcTrack, RtcTrackConfig},
18            tts::SynthesisHandle,
19            websocket::{WebsocketBytesReceiver, WebsocketTrack},
20        },
21    },
22    synthesis::{SynthesisCommand, SynthesisOption},
23    transcription::TranscriptionOption,
24};
25use crate::{
26    app::AppState,
27    call::{
28        CommandReceiver, CommandSender,
29        sip::{DialogStateReceiverGuard, Invitation, InviteDialogStates},
30    },
31    callrecord::{CallRecord, CallRecordEvent, CallRecordEventType, CallRecordHangupReason},
32    useragent::{
33        invitation::PendingDialog,
34        public_address::{
35            build_public_contact_uri, contact_needs_public_resolution, find_local_addr_for_uri,
36        },
37    },
38};
39use anyhow::Result;
40use audio_codec::CodecType;
41use chrono::{DateTime, Utc};
42use rsipstack::dialog::{invitation::InviteOption, server_dialog::ServerInviteDialog};
43use serde::{Deserialize, Serialize};
44use std::{collections::HashMap, path::Path, sync::Arc, time::Duration};
45use tokio::{fs::File, select, sync::Mutex, sync::RwLock, time::sleep};
46use tokio_util::sync::CancellationToken;
47use tracing::{debug, info, warn};
48
49#[cfg(test)]
50mod tests {
51    use super::*;
52    use crate::app::AppStateBuilder;
53    use crate::callrecord::CallRecordHangupReason;
54    use crate::config::Config;
55    use crate::media::track::tts::SynthesisHandle;
56    use crate::synthesis::SynthesisCommand;
57    use tokio::sync::mpsc;
58
59    #[tokio::test]
60    async fn test_tts_ssrc_reuse_for_autohangup() -> Result<()> {
61        let mut config = Config::default();
62        config.udp_port = 0; // Use random port
63        config.media_cache_path = "/tmp/mediacache".to_string();
64        let stream_engine = Arc::new(StreamEngine::default());
65        let app_state = AppStateBuilder::new()
66            .with_config(config)
67            .with_stream_engine(stream_engine)
68            .build()
69            .await?;
70
71        let cancel_token = CancellationToken::new();
72        let session_id = "test-session".to_string();
73        let track_config = TrackConfig::default();
74
75        let mut option = crate::CallOption::default();
76        option.tts = Some(crate::synthesis::SynthesisOption::default());
77
78        let active_call = Arc::new(ActiveCall::new(
79            ActiveCallType::Sip,
80            cancel_token.clone(),
81            session_id.clone(),
82            app_state.invitation.clone(),
83            app_state.clone(),
84            track_config,
85            None,
86            false,
87            None,
88            None,
89            None,
90        ));
91
92        {
93            let mut state = active_call.call_state.write().await;
94            state.option = Some(option);
95        }
96
97        let (tx, mut rx) = mpsc::unbounded_channel::<SynthesisCommand>();
98        let initial_ssrc = 12345;
99        let handle = SynthesisHandle::new(tx, Some("play_1".to_string()), initial_ssrc);
100
101        // 1. Set initial TTS handle
102        {
103            let mut state = active_call.call_state.write().await;
104            state.tts_handle = Some(handle);
105            state.current_play_id = Some("play_1".to_string());
106        }
107
108        // 2. Call do_tts with auto_hangup=true and same play_id
109        active_call
110            .do_tts(
111                "hangup now".to_string(),
112                None,
113                Some("play_1".to_string()),
114                Some(true),
115                false,
116                true,
117                None,
118                None,
119                false,
120                None,
121            )
122            .await?;
123
124        // 3. Verify auto_hangup state uses the EXISTING SSRC
125        {
126            let state = active_call.call_state.read().await;
127            assert!(state.auto_hangup.is_some());
128            let (h_ssrc, reason) = state.auto_hangup.clone().unwrap();
129            assert_eq!(
130                h_ssrc, initial_ssrc,
131                "SSRC should be reused from existing handle"
132            );
133            assert_eq!(reason, CallRecordHangupReason::BySystem);
134        }
135
136        // 4. Verify command was sent to existing channel
137        let cmd = rx.try_recv().expect("Should have received tts command");
138        assert_eq!(cmd.text, "hangup now");
139
140        Ok(())
141    }
142
143    #[tokio::test]
144    async fn test_tts_new_ssrc_for_different_play_id() -> Result<()> {
145        let mut config = Config::default();
146        config.udp_port = 0; // Use random port
147        config.media_cache_path = "/tmp/mediacache".to_string();
148        let stream_engine = Arc::new(StreamEngine::default());
149        let app_state = AppStateBuilder::new()
150            .with_config(config)
151            .with_stream_engine(stream_engine)
152            .build()
153            .await?;
154
155        let active_call = Arc::new(ActiveCall::new(
156            ActiveCallType::Sip,
157            CancellationToken::new(),
158            "test-session".to_string(),
159            app_state.invitation.clone(),
160            app_state.clone(),
161            TrackConfig::default(),
162            None,
163            false,
164            None,
165            None,
166            None,
167        ));
168
169        let mut tts_opt = crate::synthesis::SynthesisOption::default();
170        tts_opt.provider = Some(crate::synthesis::SynthesisType::Aliyun);
171        let mut option = crate::CallOption::default();
172        option.tts = Some(tts_opt);
173        {
174            let mut state = active_call.call_state.write().await;
175            state.option = Some(option);
176        }
177
178        let (tx, _rx) = mpsc::unbounded_channel();
179        let initial_ssrc = 111;
180        let handle = SynthesisHandle::new(tx, Some("play_1".to_string()), initial_ssrc);
181
182        {
183            let mut state = active_call.call_state.write().await;
184            state.tts_handle = Some(handle);
185            state.current_play_id = Some("play_1".to_string());
186        }
187
188        // Call do_tts with DIFFERENT play_id
189        active_call
190            .do_tts(
191                "new play".to_string(),
192                None,
193                Some("play_2".to_string()),
194                Some(true),
195                false,
196                true,
197                None,
198                None,
199                false,
200                None,
201            )
202            .await?;
203
204        // Verify auto_hangup uses a NEW SSRC (because it should interrupt and start fresh)
205        {
206            let state = active_call.call_state.read().await;
207            let (h_ssrc, _) = state.auto_hangup.clone().unwrap();
208            assert_ne!(
209                h_ssrc, initial_ssrc,
210                "Should use a new SSRC for different play_id"
211            );
212        }
213
214        Ok(())
215    }
216}
217
218#[derive(Deserialize)]
219#[serde(rename_all = "camelCase")]
220pub struct CallParams {
221    pub id: Option<String>,
222    #[serde(rename = "dump")]
223    pub dump_events: Option<bool>,
224    #[serde(rename = "ping")]
225    pub ping_interval: Option<u32>,
226    pub server_side_track: Option<String>,
227}
228
229#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
230#[serde(rename_all = "camelCase")]
231pub enum ActiveCallType {
232    Webrtc,
233    B2bua,
234    WebSocket,
235    #[default]
236    Sip,
237}
238
239#[derive(Default)]
240pub struct ActiveCallState {
241    pub session_id: String,
242    pub start_time: DateTime<Utc>,
243    pub ring_time: Option<DateTime<Utc>>,
244    pub answer_time: Option<DateTime<Utc>>,
245    pub hangup_reason: Option<CallRecordHangupReason>,
246    pub last_status_code: u16,
247    pub option: Option<CallOption>,
248    pub answer: Option<String>,
249    pub ssrc: u32,
250    pub refer_callstate: Option<ActiveCallStateRef>,
251    pub extras: Option<HashMap<String, serde_json::Value>>,
252    pub is_refer: bool,
253    pub sip_hangup_headers_template: Option<HashMap<String, String>>,
254
255    // Runtime state (migrated from ActiveCall to reduce multiple locks)
256    pub tts_handle: Option<SynthesisHandle>,
257    pub auto_hangup: Option<(u32, CallRecordHangupReason)>,
258    pub wait_input_timeout: Option<u32>,
259    pub moh: Option<String>,
260    pub current_play_id: Option<String>,
261    pub audio_receiver: Option<WebsocketBytesReceiver>,
262    pub ready_to_answer: Option<(String, Option<Box<dyn Track>>, ServerInviteDialog)>,
263    pub pending_asr_resume: Option<(u32, TranscriptionOption)>,
264}
265
266pub type ActiveCallRef = Arc<ActiveCall>;
267pub type ActiveCallStateRef = Arc<RwLock<ActiveCallState>>;
268
269pub struct ActiveCall {
270    pub call_state: ActiveCallStateRef,
271    pub cancel_token: CancellationToken,
272    pub call_type: ActiveCallType,
273    pub session_id: String,
274    pub media_stream: Arc<MediaStream>,
275    pub track_config: TrackConfig,
276    pub event_sender: EventSender,
277    pub app_state: AppState,
278    pub invitation: Invitation,
279    pub cmd_sender: CommandSender,
280    pub dump_events: bool,
281    pub server_side_track_id: TrackId,
282}
283
284pub struct ActiveCallGuard {
285    pub call: ActiveCallRef,
286    pub active_calls: usize,
287}
288
289impl ActiveCallGuard {
290    pub fn new(call: ActiveCallRef) -> Self {
291        let active_calls = {
292            call.app_state
293                .total_calls
294                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
295            let mut calls = call.app_state.active_calls.lock().unwrap();
296            calls.insert(call.session_id.clone(), call.clone());
297            calls.len()
298        };
299        Self { call, active_calls }
300    }
301}
302
303impl Drop for ActiveCallGuard {
304    fn drop(&mut self) {
305        self.call
306            .app_state
307            .active_calls
308            .lock()
309            .unwrap()
310            .remove(&self.call.session_id);
311    }
312}
313
314pub struct ActiveCallReceiver {
315    pub cmd_receiver: CommandReceiver,
316    pub dump_cmd_receiver: CommandReceiver,
317    pub dump_event_receiver: EventReceiver,
318}
319
320impl ActiveCall {
321    pub fn new(
322        call_type: ActiveCallType,
323        cancel_token: CancellationToken,
324        session_id: String,
325        invitation: Invitation,
326        app_state: AppState,
327        track_config: TrackConfig,
328        audio_receiver: Option<WebsocketBytesReceiver>,
329        dump_events: bool,
330        server_side_track_id: Option<TrackId>,
331        extras: Option<HashMap<String, serde_json::Value>>,
332        sip_hangup_headers_template: Option<HashMap<String, String>>,
333    ) -> Self {
334        let event_sender = crate::event::create_event_sender();
335        let cmd_sender = tokio::sync::broadcast::Sender::<Command>::new(32);
336        let media_stream_builder = MediaStreamBuilder::new(event_sender.clone())
337            .with_id(session_id.clone())
338            .with_cancel_token(cancel_token.child_token());
339        let media_stream = Arc::new(media_stream_builder.build());
340        let start_time = Utc::now();
341        // Inject built-in session variables into extras
342        let call_type_str = match &call_type {
343            ActiveCallType::Sip => "sip",
344            ActiveCallType::WebSocket => "websocket",
345            ActiveCallType::Webrtc => "webrtc",
346            ActiveCallType::B2bua => "b2bua",
347        };
348        let extras = {
349            let mut e = extras.unwrap_or_default();
350            e.entry(crate::playbook::BUILTIN_SESSION_ID.to_string())
351                .or_insert_with(|| serde_json::Value::String(session_id.clone()));
352            e.entry(crate::playbook::BUILTIN_CALL_TYPE.to_string())
353                .or_insert_with(|| serde_json::Value::String(call_type_str.to_string()));
354            e.entry(crate::playbook::BUILTIN_START_TIME.to_string())
355                .or_insert_with(|| serde_json::Value::String(start_time.to_rfc3339()));
356            Some(e)
357        };
358        let call_state = Arc::new(RwLock::new(ActiveCallState {
359            session_id: session_id.clone(),
360            start_time,
361            ssrc: rand::random::<u32>(),
362            extras,
363            audio_receiver,
364            sip_hangup_headers_template,
365            ..Default::default()
366        }));
367        Self {
368            cancel_token,
369            call_type,
370            session_id,
371            call_state,
372            media_stream,
373            track_config,
374            event_sender,
375            app_state,
376            invitation,
377            cmd_sender,
378            dump_events,
379            server_side_track_id: server_side_track_id.unwrap_or("server-side-track".to_string()),
380        }
381    }
382
383    pub async fn enqueue_command(&self, command: Command) -> Result<()> {
384        self.cmd_sender
385            .send(command)
386            .map_err(|e| anyhow::anyhow!("Failed to send command: {}", e))?;
387        Ok(())
388    }
389
390    /// Create a new ActiveCallReceiver for this ActiveCall
391    /// `tokio::sync::broadcast` not cached messages, so need to early create receiver
392    /// before calling `serve()`
393    pub fn new_receiver(&self) -> ActiveCallReceiver {
394        ActiveCallReceiver {
395            cmd_receiver: self.cmd_sender.subscribe(),
396            dump_cmd_receiver: self.cmd_sender.subscribe(),
397            dump_event_receiver: self.event_sender.subscribe(),
398        }
399    }
400
401    pub async fn serve(&self, receiver: ActiveCallReceiver) -> Result<()> {
402        let ActiveCallReceiver {
403            mut cmd_receiver,
404            dump_cmd_receiver,
405            dump_event_receiver,
406        } = receiver;
407
408        let process_command_loop = async move {
409            while let Ok(command) = cmd_receiver.recv().await {
410                match self.dispatch(command).await {
411                    Ok(_) => (),
412                    Err(e) => {
413                        warn!(session_id = self.session_id, "{}", e);
414                        self.event_sender
415                            .send(SessionEvent::Error {
416                                track_id: self.session_id.clone(),
417                                timestamp: crate::media::get_timestamp(),
418                                sender: "command".to_string(),
419                                error: e.to_string(),
420                                code: None,
421                            })
422                            .ok();
423                    }
424                }
425            }
426        };
427        self.app_state
428            .total_calls
429            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
430
431        tokio::join!(
432            self.dump_loop(self.dump_events, dump_cmd_receiver, dump_event_receiver),
433            async {
434                select! {
435                    _ = process_command_loop => {
436                        info!(session_id = self.session_id, "command loop done");
437                    }
438                    _ = self.process() => {
439                        info!(session_id = self.session_id, "call serve done");
440                    }
441                    _ = self.cancel_token.cancelled() => {
442                        info!(session_id = self.session_id, "call cancelled - cleaning up resources");
443                    }
444                }
445                self.cancel_token.cancel();
446            }
447        );
448        Ok(())
449    }
450
451    async fn process(&self) -> Result<()> {
452        let mut event_receiver = self.event_sender.subscribe();
453
454        let input_timeout_expire = Arc::new(Mutex::new((0u64, 0u32)));
455        let input_timeout_expire_ref = input_timeout_expire.clone();
456        let event_sender = self.event_sender.clone();
457        let wait_input_timeout_loop = async {
458            loop {
459                let (start_time, expire) = { *input_timeout_expire.lock().await };
460                if expire > 0 && crate::media::get_timestamp() >= start_time + expire as u64 {
461                    info!(session_id = self.session_id, "wait input timeout reached");
462                    *input_timeout_expire.lock().await = (0, 0);
463                    event_sender
464                        .send(SessionEvent::Silence {
465                            track_id: self.server_side_track_id.clone(),
466                            timestamp: crate::media::get_timestamp(),
467                            start_time,
468                            duration: expire as u64,
469                            samples: None,
470                        })
471                        .ok();
472                }
473                sleep(Duration::from_millis(100)).await;
474            }
475        };
476        let server_side_track_id = self.server_side_track_id.clone();
477        let event_hook_loop = async move {
478            while let Ok(event) = event_receiver.recv().await {
479                match event {
480                    SessionEvent::Speaking { .. }
481                    | SessionEvent::Dtmf { .. }
482                    | SessionEvent::AsrDelta { .. }
483                    | SessionEvent::AsrFinal { .. }
484                    | SessionEvent::TrackStart { .. } => {
485                        *input_timeout_expire_ref.lock().await = (0, 0);
486                    }
487                    SessionEvent::TrackEnd {
488                        track_id,
489                        play_id,
490                        ssrc,
491                        ..
492                    } => {
493                        if track_id != server_side_track_id {
494                            continue;
495                        }
496
497                        let (moh_path, auto_hangup, wait_timeout_val) = {
498                            let mut state = self.call_state.write().await;
499                            if play_id != state.current_play_id {
500                                debug!(
501                                    session_id = self.session_id,
502                                    ?play_id,
503                                    current = ?state.current_play_id,
504                                    "ignoring interrupted track end"
505                                );
506                                continue;
507                            }
508                            state.current_play_id = None;
509                            (
510                                state.moh.clone(),
511                                state.auto_hangup.clone(),
512                                state.wait_input_timeout.take(),
513                            )
514                        };
515
516                        if let Some(path) = moh_path {
517                            info!(session_id = self.session_id, "looping moh: {}", path);
518                            let ssrc = rand::random::<u32>();
519                            let file_track = FileTrack::new(self.server_side_track_id.clone())
520                                .with_play_id(Some(path.clone()))
521                                .with_ssrc(ssrc)
522                                .with_path(path.clone())
523                                .with_cancel_token(self.cancel_token.child_token());
524                            self.update_track_wrapper(Box::new(file_track), Some(path))
525                                .await;
526                            continue;
527                        }
528
529                        if let Some((hangup_ssrc, hangup_reason)) = auto_hangup {
530                            if hangup_ssrc == ssrc {
531                                info!(
532                                    session_id = self.session_id,
533                                    ssrc, "auto hangup when track end track_id:{}", track_id
534                                );
535                                self.do_hangup(Some(hangup_reason), None, None).await.ok();
536                            }
537                        }
538
539                        if let Some(timeout) = wait_timeout_val {
540                            let expire = if timeout > 0 {
541                                (crate::media::get_timestamp(), timeout)
542                            } else {
543                                (0, 0)
544                            };
545                            *input_timeout_expire_ref.lock().await = expire;
546                        }
547                    }
548                    SessionEvent::Interrupt { receiver } => {
549                        let track_id =
550                            receiver.unwrap_or_else(|| self.server_side_track_id.clone());
551                        if track_id == self.server_side_track_id {
552                            debug!(
553                                session_id = self.session_id,
554                                "received interrupt event, stopping playback"
555                            );
556                            self.do_interrupt(true).await.ok();
557                        }
558                    }
559                    SessionEvent::Inactivity { track_id, .. } => {
560                        info!(
561                            session_id = self.session_id,
562                            track_id, "inactivity timeout reached, hanging up"
563                        );
564                        self.do_hangup(Some(CallRecordHangupReason::InactivityTimeout), None, None)
565                            .await
566                            .ok();
567                    }
568                    SessionEvent::Hangup { refer, .. } => {
569                        // Check if we need to resume ASR after refer hangup
570                        if refer == Some(true) {
571                            let mut cs = self.call_state.write().await;
572                            if let Some((refer_ssrc, asr_option)) = cs.pending_asr_resume.take() {
573                                // Verify it's the refer call that ended
574                                let is_refer_hangup = cs
575                                    .refer_callstate
576                                    .as_ref()
577                                    .map(|rcs| {
578                                        rcs.try_read()
579                                            .map(|g| g.ssrc == refer_ssrc)
580                                            .unwrap_or(false)
581                                    })
582                                    .unwrap_or(false);
583
584                                if is_refer_hangup {
585                                    drop(cs); // Release lock before async operations
586                                    info!(
587                                        session_id = self.session_id,
588                                        "Refer call ended, resuming parent ASR"
589                                    );
590
591                                    // Resume ASR
592                                    match self
593                                        .app_state
594                                        .stream_engine
595                                        .create_asr_processor(
596                                            self.server_side_track_id.clone(),
597                                            self.cancel_token.child_token(),
598                                            asr_option,
599                                            self.event_sender.clone(),
600                                        )
601                                        .await
602                                    {
603                                        Ok(asr_processor) => {
604                                            if let Err(e) = self
605                                                .media_stream
606                                                .append_processor(
607                                                    &self.server_side_track_id,
608                                                    asr_processor,
609                                                )
610                                                .await
611                                            {
612                                                warn!(
613                                                    session_id = self.session_id,
614                                                    "Failed to resume ASR after refer: {}", e
615                                                );
616                                            }
617                                        }
618                                        Err(e) => {
619                                            warn!(
620                                                session_id = self.session_id,
621                                                "Failed to create ASR processor for resume: {}", e
622                                            );
623                                        }
624                                    }
625                                }
626                            }
627                        }
628                    }
629                    SessionEvent::Error { track_id, .. } => {
630                        if track_id != server_side_track_id {
631                            continue;
632                        }
633
634                        let moh_info = {
635                            let mut state = self.call_state.write().await;
636                            if let Some(path) = state.moh.clone() {
637                                let fallback = "./config/sounds/refer_moh.wav".to_string();
638                                let next_path = if path != fallback
639                                    && std::path::Path::new(&fallback).exists()
640                                {
641                                    info!(
642                                        session_id = self.session_id,
643                                        "moh error, switching to fallback: {}", fallback
644                                    );
645                                    state.moh = Some(fallback.clone());
646                                    fallback
647                                } else {
648                                    info!(
649                                        session_id = self.session_id,
650                                        "looping moh on error: {}", path
651                                    );
652                                    path
653                                };
654                                Some(next_path)
655                            } else {
656                                None
657                            }
658                        };
659
660                        if let Some(next_path) = moh_info {
661                            let ssrc = rand::random::<u32>();
662                            let file_track = FileTrack::new(self.server_side_track_id.clone())
663                                .with_play_id(Some(next_path.clone()))
664                                .with_ssrc(ssrc)
665                                .with_path(next_path.clone())
666                                .with_cancel_token(self.cancel_token.child_token());
667                            self.update_track_wrapper(Box::new(file_track), Some(next_path))
668                                .await;
669                            continue;
670                        }
671                    }
672                    _ => {}
673                }
674            }
675        };
676
677        select! {
678            _ = wait_input_timeout_loop=>{
679                info!(session_id = self.session_id, "wait input timeout loop done");
680            }
681            _ = self.media_stream.serve() => {
682                info!(session_id = self.session_id, "media stream loop done");
683            }
684            _ = event_hook_loop => {
685                info!(session_id = self.session_id, "event loop done");
686            }
687        }
688        Ok(())
689    }
690
691    async fn dispatch(&self, command: Command) -> Result<()> {
692        match command {
693            Command::Invite { option } => self.do_invite(option).await,
694            Command::Accept { option } => self.do_accept(option).await,
695            Command::Reject { reason, code } => {
696                self.do_reject(code.map(|c| (c as u16).into()), Some(reason))
697                    .await
698            }
699            Command::Ringing {
700                ringtone,
701                recorder,
702                early_media,
703            } => self.do_ringing(ringtone, recorder, early_media).await,
704            Command::Tts {
705                text,
706                speaker,
707                play_id,
708                auto_hangup,
709                streaming,
710                end_of_stream,
711                option,
712                wait_input_timeout,
713                base64,
714                cache_key,
715            } => {
716                self.do_tts(
717                    text,
718                    speaker,
719                    play_id,
720                    auto_hangup,
721                    streaming.unwrap_or_default(),
722                    end_of_stream.unwrap_or_default(),
723                    option,
724                    wait_input_timeout,
725                    base64.unwrap_or_default(),
726                    cache_key,
727                )
728                .await
729            }
730            Command::Play {
731                url,
732                play_id,
733                auto_hangup,
734                wait_input_timeout,
735            } => {
736                self.do_play(url, play_id, auto_hangup, wait_input_timeout)
737                    .await
738            }
739            Command::Hangup {
740                reason,
741                initiator,
742                headers,
743            } => {
744                let reason = reason.map(|r| {
745                    r.parse::<CallRecordHangupReason>()
746                        .unwrap_or(CallRecordHangupReason::BySystem)
747                });
748                self.do_hangup(reason, initiator, headers).await
749            }
750            Command::Refer {
751                caller,
752                callee,
753                options,
754            } => self.do_refer(caller, callee, options).await,
755            Command::Mute { track_id } => self.do_mute(track_id).await,
756            Command::Unmute { track_id } => self.do_unmute(track_id).await,
757            Command::Pause {} => self.do_pause().await,
758            Command::Resume {} => self.do_resume().await,
759            Command::Interrupt {
760                graceful: passage,
761                fade_out_ms: _,
762            } => self.do_interrupt(passage.unwrap_or_default()).await,
763            Command::History { speaker, text } => self.do_history(speaker, text).await,
764        }
765    }
766
767    fn build_record_option(&self, option: &CallOption) -> Option<RecorderOption> {
768        if let Some(recorder_option) = &option.recorder {
769            let mut recorder_file = recorder_option.recorder_file.clone();
770            if recorder_file.contains("{id}") {
771                recorder_file = recorder_file.replace("{id}", &self.session_id);
772            }
773
774            let recorder_file = if recorder_file.is_empty() {
775                self.app_state.get_recorder_file(&self.session_id)
776            } else {
777                let p = Path::new(&recorder_file);
778                p.is_absolute()
779                    .then(|| recorder_file.clone())
780                    .unwrap_or_else(|| self.app_state.get_recorder_file(&recorder_file))
781            };
782            info!(
783                session_id = self.session_id,
784                recorder_file, "created recording file"
785            );
786
787            let track_samplerate = self.track_config.samplerate;
788            let recorder_samplerate = if track_samplerate > 0 {
789                track_samplerate
790            } else {
791                recorder_option.samplerate
792            };
793            let recorder_ptime = if recorder_option.ptime == 0 {
794                200
795            } else {
796                recorder_option.ptime
797            };
798            let requested_format = recorder_option
799                .format
800                .unwrap_or(self.app_state.config.recorder_format());
801            let format = requested_format.effective();
802            if requested_format != format {
803                warn!(
804                    session_id = self.session_id,
805                    requested = requested_format.extension(),
806                    "Recorder format fallback to wav due to unsupported feature"
807                );
808            }
809            let mut recorder_config = RecorderOption {
810                recorder_file,
811                samplerate: recorder_samplerate,
812                ptime: recorder_ptime,
813                format: Some(format),
814            };
815            recorder_config.ensure_path_extension(format);
816            Some(recorder_config)
817        } else {
818            None
819        }
820    }
821
822    async fn invite_or_accept(&self, mut option: CallOption, sender: String) -> Result<CallOption> {
823        // Merge with existing configuration (e.g., from playbook)
824        {
825            let state = self.call_state.read().await;
826            option = state.merge_option(option);
827        }
828
829        option.check_default();
830        if let Some(opt) = self.build_record_option(&option) {
831            self.media_stream.update_recorder_option(opt).await;
832        }
833
834        if let Some(opt) = &option.media_pass {
835            let track_id = self.server_side_track_id.clone();
836            let cancel_token = self.cancel_token.child_token();
837            let ssrc = rand::random::<u32>();
838            let media_pass_track = MediaPassTrack::new(
839                self.session_id.clone(),
840                ssrc,
841                track_id,
842                cancel_token,
843                opt.clone(),
844            );
845            self.update_track_wrapper(Box::new(media_pass_track), None)
846                .await;
847        }
848
849        info!(
850            session_id = self.session_id,
851            call_type = ?self.call_type,
852            sender,
853            ?option,
854            "caller with option"
855        );
856
857        match self.setup_caller_track(&option).await {
858            Ok(_) => return Ok(option),
859            Err(e) => {
860                self.app_state
861                    .total_failed_calls
862                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
863                let error_event = crate::event::SessionEvent::Error {
864                    track_id: self.session_id.clone(),
865                    timestamp: crate::media::get_timestamp(),
866                    sender,
867                    error: e.to_string(),
868                    code: None,
869                };
870                self.event_sender.send(error_event).ok();
871                self.do_hangup(Some(CallRecordHangupReason::BySystem), None, None)
872                    .await
873                    .ok();
874                return Err(e);
875            }
876        }
877    }
878
879    async fn do_invite(&self, option: CallOption) -> Result<()> {
880        self.invite_or_accept(option, "invite".to_string())
881            .await
882            .map(|_| ())
883    }
884
885    async fn do_accept(&self, mut option: CallOption) -> Result<()> {
886        let has_pending = self
887            .invitation
888            .find_dialog_id_by_session_id(&self.session_id)
889            .is_some();
890        let ready_to_answer_val = {
891            let state = self.call_state.read().await;
892            state.ready_to_answer.is_none()
893        };
894
895        if ready_to_answer_val {
896            if !has_pending {
897                // emit reject event
898                warn!(session_id = self.session_id, "no pending call to accept");
899                let rejet_event = crate::event::SessionEvent::Reject {
900                    track_id: self.session_id.clone(),
901                    timestamp: crate::media::get_timestamp(),
902                    reason: "no pending call".to_string(),
903                    refer: None,
904                    code: Some(486),
905                };
906                self.event_sender.send(rejet_event).ok();
907                self.do_hangup(Some(CallRecordHangupReason::BySystem), None, None)
908                    .await
909                    .ok();
910                return Err(anyhow::anyhow!("no pending call to accept"));
911            }
912            option = self.invite_or_accept(option, "accept".to_string()).await?;
913        } else {
914            option.check_default();
915            self.call_state.write().await.option = Some(option.clone());
916        }
917        info!(session_id = self.session_id, ?option, "accepting call");
918        let ready = self.call_state.write().await.ready_to_answer.take();
919        if let Some((answer, track, dialog)) = ready {
920            info!(
921                session_id = self.session_id,
922                track_id = track.as_ref().map(|t| t.id()),
923                "ready to answer with track"
924            );
925
926            let headers = vec![rsip::Header::ContentType(
927                "application/sdp".to_string().into(),
928            )];
929
930            match dialog.accept(Some(headers), Some(answer.as_bytes().to_vec())) {
931                Ok(_) => {
932                    {
933                        let mut state = self.call_state.write().await;
934                        state.answer = Some(answer);
935                        state.answer_time = Some(Utc::now());
936                    }
937                    self.finish_caller_stack(&option, track).await?;
938                }
939                Err(e) => {
940                    warn!(session_id = self.session_id, "failed to accept call: {}", e);
941                    return Err(anyhow::anyhow!("failed to accept call"));
942                }
943            }
944        }
945        return Ok(());
946    }
947
948    async fn do_reject(
949        &self,
950        code: Option<rsip::StatusCode>,
951        reason: Option<String>,
952    ) -> Result<()> {
953        match self
954            .invitation
955            .find_dialog_id_by_session_id(&self.session_id)
956        {
957            Some(id) => {
958                info!(
959                    session_id = self.session_id,
960                    ?reason,
961                    ?code,
962                    "rejecting call"
963                );
964                self.invitation.hangup(id, code, reason).await
965            }
966            None => Ok(()),
967        }
968    }
969
970    async fn do_ringing(
971        &self,
972        ringtone: Option<String>,
973        recorder: Option<RecorderOption>,
974        early_media: Option<bool>,
975    ) -> Result<()> {
976        let ready_to_answer_val = self.call_state.read().await.ready_to_answer.is_none();
977        if ready_to_answer_val {
978            let option = CallOption {
979                recorder,
980                ..Default::default()
981            };
982            let _ = self.invite_or_accept(option, "ringing".to_string()).await?;
983        }
984
985        let state = self.call_state.read().await;
986        if let Some((answer, _, dialog)) = state.ready_to_answer.as_ref() {
987            let (headers, body) = if early_media.unwrap_or_default() || ringtone.is_some() {
988                let headers = vec![rsip::Header::ContentType(
989                    "application/sdp".to_string().into(),
990                )];
991                (Some(headers), Some(answer.as_bytes().to_vec()))
992            } else {
993                (None, None)
994            };
995
996            dialog.ringing(headers, body).ok();
997            info!(
998                session_id = self.session_id,
999                ringtone, early_media, "playing ringtone"
1000            );
1001            if let Some(ringtone_url) = ringtone {
1002                drop(state);
1003                self.do_play(ringtone_url, None, None, None).await.ok();
1004            } else {
1005                info!(session_id = self.session_id, "no ringtone to play");
1006            }
1007        }
1008        Ok(())
1009    }
1010
1011    async fn do_tts(
1012        &self,
1013        text: String,
1014        speaker: Option<String>,
1015        play_id: Option<String>,
1016        auto_hangup: Option<bool>,
1017        streaming: bool,
1018        end_of_stream: bool,
1019        option: Option<SynthesisOption>,
1020        wait_input_timeout: Option<u32>,
1021        base64: bool,
1022        cache_key: Option<String>,
1023    ) -> Result<()> {
1024        let tts_option = {
1025            let call_state = self.call_state.read().await;
1026            match call_state.option.clone().unwrap_or_default().tts {
1027                Some(opt) => opt.merge_with(option),
1028                None => {
1029                    if let Some(opt) = option {
1030                        opt
1031                    } else {
1032                        return Err(anyhow::anyhow!("no tts option available"));
1033                    }
1034                }
1035            }
1036        };
1037        let speaker = match speaker {
1038            Some(s) => Some(s),
1039            None => tts_option.speaker.clone(),
1040        };
1041
1042        let mut play_command = SynthesisCommand {
1043            text,
1044            speaker,
1045            play_id: play_id.clone(),
1046            streaming,
1047            end_of_stream,
1048            option: tts_option,
1049            base64,
1050            cache_key,
1051        };
1052        info!(
1053            session_id = self.session_id,
1054            provider = ?play_command.option.provider,
1055            text = %play_command.text.chars().take(10).collect::<String>(),
1056            speaker = play_command.speaker.as_deref(),
1057            auto_hangup = auto_hangup.unwrap_or_default(),
1058            play_id = play_command.play_id.as_deref(),
1059            streaming = play_command.streaming,
1060            end_of_stream = play_command.end_of_stream,
1061            wait_input_timeout = wait_input_timeout.unwrap_or_default(),
1062            is_base64 = play_command.base64,
1063            cache_key = play_command.cache_key.as_deref(),
1064            "new synthesis"
1065        );
1066
1067        let ssrc = rand::random::<u32>();
1068        let (should_interrupt, picked_ssrc) = {
1069            let mut state = self.call_state.write().await;
1070
1071            let (target_ssrc, changed) = if let Some(handle) = &state.tts_handle {
1072                if play_id.is_some() && state.current_play_id != play_id {
1073                    (ssrc, true)
1074                } else {
1075                    (handle.ssrc, false)
1076                }
1077            } else {
1078                (ssrc, false)
1079            };
1080
1081            state.auto_hangup = match auto_hangup {
1082                Some(true) => Some((target_ssrc, CallRecordHangupReason::BySystem)),
1083                _ => state.auto_hangup.clone(),
1084            };
1085            state.wait_input_timeout = wait_input_timeout;
1086
1087            state.current_play_id = play_id.clone();
1088            (changed, target_ssrc)
1089        };
1090
1091        if should_interrupt {
1092            let _ = self.do_interrupt(false).await;
1093        }
1094
1095        let existing_handle = self.call_state.read().await.tts_handle.clone();
1096        if let Some(tts_handle) = existing_handle {
1097            match tts_handle.try_send(play_command) {
1098                Ok(_) => return Ok(()),
1099                Err(e) => {
1100                    play_command = e.0;
1101                }
1102            }
1103        }
1104
1105        let (new_handle, tts_track) = StreamEngine::create_tts_track(
1106            self.app_state.stream_engine.clone(),
1107            self.cancel_token.child_token(),
1108            self.session_id.clone(),
1109            self.server_side_track_id.clone(),
1110            picked_ssrc,
1111            play_id.clone(),
1112            streaming,
1113            &play_command.option,
1114        )
1115        .await?;
1116
1117        new_handle.try_send(play_command)?;
1118        self.call_state.write().await.tts_handle = Some(new_handle);
1119        self.update_track_wrapper(tts_track, play_id).await;
1120        Ok(())
1121    }
1122
1123    async fn do_play(
1124        &self,
1125        url: String,
1126        play_id: Option<String>,
1127        auto_hangup: Option<bool>,
1128        wait_input_timeout: Option<u32>,
1129    ) -> Result<()> {
1130        let ssrc = rand::random::<u32>();
1131        info!(
1132            session_id = self.session_id,
1133            ssrc, url, play_id, auto_hangup, "play file track"
1134        );
1135
1136        let play_id = play_id.or(Some(url.clone()));
1137
1138        let file_track = FileTrack::new(self.server_side_track_id.clone())
1139            .with_play_id(play_id.clone())
1140            .with_ssrc(ssrc)
1141            .with_path(url)
1142            .with_cancel_token(self.cancel_token.child_token());
1143
1144        {
1145            let mut state = self.call_state.write().await;
1146            state.tts_handle = None;
1147            state.auto_hangup = match auto_hangup {
1148                Some(true) => Some((ssrc, CallRecordHangupReason::BySystem)),
1149                _ => None,
1150            };
1151            state.wait_input_timeout = wait_input_timeout;
1152        }
1153
1154        self.update_track_wrapper(Box::new(file_track), play_id)
1155            .await;
1156        Ok(())
1157    }
1158
1159    async fn do_history(&self, speaker: String, text: String) -> Result<()> {
1160        self.event_sender
1161            .send(SessionEvent::AddHistory {
1162                sender: Some(self.session_id.clone()),
1163                timestamp: crate::media::get_timestamp(),
1164                speaker,
1165                text,
1166            })
1167            .map(|_| ())
1168            .map_err(Into::into)
1169    }
1170
1171    async fn do_interrupt(&self, graceful: bool) -> Result<()> {
1172        {
1173            let mut state = self.call_state.write().await;
1174            state.tts_handle = None;
1175            state.moh = None;
1176        }
1177        self.media_stream
1178            .remove_track(&self.server_side_track_id, graceful)
1179            .await;
1180        Ok(())
1181    }
1182    async fn do_pause(&self) -> Result<()> {
1183        Ok(())
1184    }
1185    async fn do_resume(&self) -> Result<()> {
1186        Ok(())
1187    }
1188    async fn do_hangup(
1189        &self,
1190        reason: Option<CallRecordHangupReason>,
1191        initiator: Option<String>,
1192        headers: Option<HashMap<String, String>>,
1193    ) -> Result<()> {
1194        info!(
1195            session_id = self.session_id,
1196            ?reason,
1197            ?initiator,
1198            ?headers,
1199            "do_hangup"
1200        );
1201
1202        // Store headers in extras if provided
1203        if let Some(headers) = headers {
1204            let h_val = serde_json::to_value(&headers).unwrap_or_default();
1205
1206            {
1207                let mut state = self.call_state.write().await;
1208                let mut extras = state.extras.take().unwrap_or_default();
1209                extras.insert("_hangup_headers".to_string(), h_val.clone());
1210                state.extras = Some(extras);
1211            }
1212        } else {
1213        }
1214
1215        let hangup_reason = match initiator.as_deref() {
1216            Some("caller") => CallRecordHangupReason::ByCaller,
1217            Some("callee") => CallRecordHangupReason::ByCallee,
1218            Some("system") => CallRecordHangupReason::Autohangup,
1219            _ => reason.unwrap_or(CallRecordHangupReason::BySystem),
1220        };
1221
1222        self.media_stream
1223            .stop(Some(hangup_reason.to_string()), initiator);
1224
1225        self.call_state
1226            .write()
1227            .await
1228            .set_hangup_reason(hangup_reason);
1229        Ok(())
1230    }
1231
1232    async fn do_refer(
1233        &self,
1234        caller: String,
1235        callee: String,
1236        refer_option: Option<ReferOption>,
1237    ) -> Result<()> {
1238        self.do_interrupt(false).await.ok();
1239
1240        // Check if we should pause parent ASR
1241        let pause_parent_asr = refer_option
1242            .as_ref()
1243            .and_then(|o| o.pause_parent_asr)
1244            .unwrap_or(false);
1245
1246        // Save original ASR option for later resume
1247        let original_asr_option = if pause_parent_asr {
1248            let cs = self.call_state.read().await;
1249            cs.option.as_ref().and_then(|o| o.asr.clone())
1250        } else {
1251            None
1252        };
1253
1254        // Pause parent ASR if requested
1255        if pause_parent_asr {
1256            info!(
1257                session_id = self.session_id,
1258                "Pausing parent call ASR during refer"
1259            );
1260            self.media_stream
1261                .remove_processor::<crate::media::asr_processor::AsrProcessor>(
1262                    &self.server_side_track_id,
1263                )
1264                .await
1265                .ok();
1266        }
1267
1268        let mut moh = refer_option.as_ref().and_then(|o| o.moh.clone());
1269        if let Some(ref path) = moh {
1270            if !path.starts_with("http") && !std::path::Path::new(path).exists() {
1271                let fallback = "./config/sounds/refer_moh.wav";
1272                if std::path::Path::new(fallback).exists() {
1273                    info!(
1274                        session_id = self.session_id,
1275                        "moh {} not found, using fallback {}", path, fallback
1276                    );
1277                    moh = Some(fallback.to_string());
1278                }
1279            }
1280        }
1281        let ref_call_id = refer_option
1282            .as_ref()
1283            .and_then(|o| o.call_id.clone())
1284            .unwrap_or_else(|| format!("ref-{}-{}", rand::random::<u32>(), self.session_id));
1285
1286        let session_id = self.session_id.clone();
1287        let track_id = self.server_side_track_id.clone();
1288
1289        let recorder = {
1290            let cs = self.call_state.read().await;
1291            cs.option
1292                .as_ref()
1293                .map(|o| o.recorder.clone())
1294                .unwrap_or_default()
1295        };
1296
1297        let call_option = CallOption {
1298            caller: Some(caller),
1299            callee: Some(callee.clone()),
1300            sip: refer_option.as_ref().and_then(|o| o.sip.clone()),
1301            asr: refer_option.as_ref().and_then(|o| o.asr.clone()),
1302            denoise: refer_option.as_ref().and_then(|o| o.denoise.clone()),
1303            recorder,
1304            ..Default::default()
1305        };
1306
1307        let mut invite_option = call_option.build_invite_option()?;
1308        invite_option.call_id = Some(ref_call_id);
1309
1310        let headers = invite_option.headers.get_or_insert_with(|| Vec::new());
1311
1312        {
1313            let cs = self.call_state.read().await;
1314            if let Some(opt) = cs.option.as_ref() {
1315                if let Some(callee) = opt.callee.as_ref() {
1316                    headers.push(rsip::Header::Other(
1317                        "X-Referred-To".to_string(),
1318                        callee.clone(),
1319                    ));
1320                }
1321                if let Some(caller) = opt.caller.as_ref() {
1322                    headers.push(rsip::Header::Other(
1323                        "X-Referred-From".to_string(),
1324                        caller.clone(),
1325                    ));
1326                }
1327            }
1328        }
1329
1330        headers.push(rsip::Header::Other(
1331            "X-Referred-Id".to_string(),
1332            self.session_id.clone(),
1333        ));
1334
1335        let ssrc = rand::random::<u32>();
1336        let refer_call_state = Arc::new(RwLock::new(ActiveCallState {
1337            start_time: Utc::now(),
1338            ssrc,
1339            option: Some(call_option.clone()),
1340            is_refer: true,
1341            ..Default::default()
1342        }));
1343
1344        {
1345            let mut cs = self.call_state.write().await;
1346            cs.refer_callstate.replace(refer_call_state.clone());
1347        }
1348
1349        let auto_hangup_requested = refer_option
1350            .as_ref()
1351            .and_then(|o| o.auto_hangup)
1352            .unwrap_or(true);
1353
1354        if auto_hangup_requested {
1355            self.call_state.write().await.auto_hangup =
1356                Some((ssrc, CallRecordHangupReason::ByRefer));
1357        } else {
1358            self.call_state.write().await.auto_hangup = None;
1359        }
1360
1361        // Setup ASR resume after refer ends (if not auto_hangup and ASR was paused)
1362        if !auto_hangup_requested && pause_parent_asr && original_asr_option.is_some() {
1363            let asr_option = original_asr_option.unwrap();
1364            self.call_state.write().await.pending_asr_resume = Some((ssrc, asr_option));
1365        }
1366
1367        let timeout_secs = refer_option.as_ref().and_then(|o| o.timeout).unwrap_or(30);
1368
1369        info!(
1370            session_id = self.session_id,
1371            ssrc,
1372            auto_hangup = auto_hangup_requested,
1373            callee,
1374            timeout_secs,
1375            "do_refer"
1376        );
1377
1378        let r = tokio::time::timeout(
1379            Duration::from_secs(timeout_secs as u64),
1380            self.create_outgoing_sip_track(
1381                self.cancel_token.child_token(),
1382                refer_call_state.clone(),
1383                &track_id,
1384                invite_option,
1385                &call_option,
1386                moh,
1387                auto_hangup_requested,
1388            ),
1389        )
1390        .await;
1391
1392        {
1393            self.call_state.write().await.moh = None;
1394        }
1395
1396        let result = match r {
1397            Ok(res) => res,
1398            Err(_) => {
1399                warn!(
1400                    session_id = session_id,
1401                    "refer sip track creation timed out after {} seconds", timeout_secs
1402                );
1403                self.event_sender
1404                    .send(SessionEvent::Reject {
1405                        track_id,
1406                        timestamp: crate::media::get_timestamp(),
1407                        reason: "Timeout when refer".into(),
1408                        code: Some(408),
1409                        refer: Some(true),
1410                    })
1411                    .ok();
1412                return Err(anyhow::anyhow!("refer sip track creation timed out").into());
1413            }
1414        };
1415
1416        match result {
1417            Ok(answer) => {
1418                self.event_sender
1419                    .send(SessionEvent::Answer {
1420                        timestamp: crate::media::get_timestamp(),
1421                        track_id,
1422                        sdp: answer,
1423                        refer: Some(true),
1424                    })
1425                    .ok();
1426            }
1427            Err(e) => {
1428                warn!(
1429                    session_id = session_id,
1430                    "failed to create refer sip track: {}", e
1431                );
1432                match &e {
1433                    rsipstack::Error::DialogError(reason, _, code) => {
1434                        self.event_sender
1435                            .send(SessionEvent::Reject {
1436                                track_id,
1437                                timestamp: crate::media::get_timestamp(),
1438                                reason: reason.clone(),
1439                                code: Some(code.code() as u32),
1440                                refer: Some(true),
1441                            })
1442                            .ok();
1443                    }
1444                    _ => {}
1445                }
1446                return Err(e.into());
1447            }
1448        }
1449        Ok(())
1450    }
1451
1452    async fn do_mute(&self, track_id: Option<String>) -> Result<()> {
1453        self.media_stream.mute_track(track_id).await;
1454        Ok(())
1455    }
1456
1457    async fn do_unmute(&self, track_id: Option<String>) -> Result<()> {
1458        self.media_stream.unmute_track(track_id).await;
1459        Ok(())
1460    }
1461
1462    pub async fn cleanup(&self) -> Result<()> {
1463        self.call_state.write().await.tts_handle = None;
1464        self.media_stream.cleanup().await.ok();
1465        Ok(())
1466    }
1467
1468    pub fn get_callrecord(&self) -> Option<CallRecord> {
1469        self.call_state.try_read().ok().map(|call_state| {
1470            call_state.build_callrecord(
1471                self.app_state.clone(),
1472                self.session_id.clone(),
1473                self.call_type.clone(),
1474            )
1475        })
1476    }
1477
1478    async fn dump_to_file(
1479        &self,
1480        dump_file: &mut File,
1481        cmd_receiver: &mut CommandReceiver,
1482        event_receiver: &mut EventReceiver,
1483    ) {
1484        loop {
1485            select! {
1486                _ = self.cancel_token.cancelled() => {
1487                    break;
1488                }
1489                Ok(cmd) = cmd_receiver.recv() => {
1490                    CallRecordEvent::write(CallRecordEventType::Command, cmd, dump_file)
1491                        .await;
1492                }
1493                Ok(event) = event_receiver.recv() => {
1494                    if matches!(event, SessionEvent::Binary{..}) {
1495                        continue;
1496                    }
1497                    CallRecordEvent::write(CallRecordEventType::Event, event, dump_file)
1498                        .await;
1499                }
1500            };
1501        }
1502    }
1503
1504    async fn dump_loop(
1505        &self,
1506        dump_events: bool,
1507        mut dump_cmd_receiver: CommandReceiver,
1508        mut dump_event_receiver: EventReceiver,
1509    ) {
1510        if !dump_events {
1511            return;
1512        }
1513
1514        let file_name = self.app_state.get_dump_events_file(&self.session_id);
1515        let mut dump_file = match File::options()
1516            .create(true)
1517            .append(true)
1518            .open(&file_name)
1519            .await
1520        {
1521            Ok(file) => file,
1522            Err(e) => {
1523                warn!(
1524                    session_id = self.session_id,
1525                    file_name, "failed to open dump events file: {}", e
1526                );
1527                return;
1528            }
1529        };
1530        self.dump_to_file(
1531            &mut dump_file,
1532            &mut dump_cmd_receiver,
1533            &mut dump_event_receiver,
1534        )
1535        .await;
1536
1537        while let Ok(event) = dump_event_receiver.try_recv() {
1538            if matches!(event, SessionEvent::Binary { .. }) {
1539                continue;
1540            }
1541            CallRecordEvent::write(CallRecordEventType::Event, event, &mut dump_file).await;
1542        }
1543    }
1544
1545    pub async fn create_rtp_track(
1546        &self,
1547        track_id: TrackId,
1548        ssrc: u32,
1549        enable_srtp: Option<bool>,
1550    ) -> Result<RtcTrack> {
1551        let mut rtc_config = RtcTrackConfig::default();
1552        // Per-call flag takes precedence over global config.
1553        let use_srtp = enable_srtp
1554            .or(self.app_state.config.enable_srtp)
1555            .unwrap_or(false);
1556        rtc_config.mode = if use_srtp {
1557            rustrtc::TransportMode::Srtp
1558        } else {
1559            rustrtc::TransportMode::Rtp
1560        };
1561
1562        if let Some(codecs) = &self.app_state.config.codecs {
1563            let mut codec_types = Vec::new();
1564            for c in codecs {
1565                match c.to_lowercase().as_str() {
1566                    "pcmu" => codec_types.push(CodecType::PCMU),
1567                    "pcma" => codec_types.push(CodecType::PCMA),
1568                    "g722" => codec_types.push(CodecType::G722),
1569                    "g729" => codec_types.push(CodecType::G729),
1570                    #[cfg(feature = "opus")]
1571                    "opus" => codec_types.push(CodecType::Opus),
1572                    "dtmf" | "2833" | "telephone_event" => {
1573                        codec_types.push(CodecType::TelephoneEvent)
1574                    }
1575                    _ => {}
1576                }
1577            }
1578            if !codec_types.is_empty() {
1579                rtc_config.preferred_codec = Some(codec_types[0].clone());
1580                rtc_config.codecs = codec_types;
1581            }
1582        }
1583
1584        if rtc_config.preferred_codec.is_none() {
1585            rtc_config.preferred_codec = Some(self.track_config.codec.clone());
1586        }
1587
1588        rtc_config.rtp_port_range = self
1589            .app_state
1590            .config
1591            .rtp_start_port
1592            .zip(self.app_state.config.rtp_end_port);
1593
1594        if let Some(ref external_ip) = self.app_state.config.external_ip {
1595            rtc_config.external_ip = Some(external_ip.clone());
1596        }
1597        if let Some(ref bind_ip) = self.app_state.config.rtp_bind_ip {
1598            rtc_config.bind_ip = Some(bind_ip.clone());
1599        }
1600
1601        rtc_config.enable_latching = self.app_state.config.enable_rtp_latching;
1602        rtc_config.enable_ice_lite = self.app_state.config.enable_ice_lite;
1603
1604        let mut track = RtcTrack::new(
1605            self.cancel_token.child_token(),
1606            track_id,
1607            self.track_config.clone(),
1608            rtc_config,
1609        )
1610        .with_ssrc(ssrc);
1611
1612        track.create().await?;
1613
1614        Ok(track)
1615    }
1616
1617    async fn setup_caller_track(&self, option: &CallOption) -> Result<()> {
1618        let hangup_headers = option
1619            .sip
1620            .as_ref()
1621            .and_then(|s| s.hangup_headers.as_ref())
1622            .map(|headers_map| {
1623                headers_map
1624                    .iter()
1625                    .map(|(k, v)| rsip::Header::Other(k.clone(), v.clone()))
1626                    .collect::<Vec<rsip::Header>>()
1627            });
1628        self.call_state.write().await.option = Some(option.clone());
1629        info!(
1630            session_id = self.session_id,
1631            call_type = ?self.call_type,
1632            "setup caller track"
1633        );
1634
1635        let track = match self.call_type {
1636            ActiveCallType::Webrtc => Some(self.create_webrtc_track().await?),
1637            ActiveCallType::WebSocket => {
1638                let audio_receiver = self.call_state.write().await.audio_receiver.take();
1639                if let Some(receiver) = audio_receiver {
1640                    Some(self.create_websocket_track(receiver).await?)
1641                } else {
1642                    None
1643                }
1644            }
1645            ActiveCallType::Sip => {
1646                if let Some(dialog_id) = self
1647                    .invitation
1648                    .find_dialog_id_by_session_id(&self.session_id)
1649                {
1650                    if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1651                        return self
1652                            .prepare_incoming_sip_track(
1653                                self.cancel_token.clone(),
1654                                self.call_state.clone(),
1655                                &self.session_id,
1656                                pending_dialog,
1657                                hangup_headers,
1658                            )
1659                            .await;
1660                    }
1661                }
1662
1663                // Auto-inject credentials from registered users if not already provided
1664                let mut option = option.clone();
1665                if option.sip.is_none()
1666                    || option
1667                        .sip
1668                        .as_ref()
1669                        .and_then(|s| s.username.as_ref())
1670                        .is_none()
1671                {
1672                    if let Some(callee) = &option.callee {
1673                        if let Some(cred) = self.app_state.find_credentials_for_callee(callee) {
1674                            if option.sip.is_none() {
1675                                option.sip = Some(crate::SipOption {
1676                                    username: Some(cred.username.clone()),
1677                                    password: Some(cred.password.clone()),
1678                                    realm: cred.realm.clone(),
1679                                    ..Default::default()
1680                                });
1681                            }
1682                        }
1683                    }
1684                }
1685
1686                let mut invite_option = option.build_invite_option()?;
1687                invite_option.call_id = Some(self.session_id.clone());
1688
1689                match self
1690                    .create_outgoing_sip_track(
1691                        self.cancel_token.clone(),
1692                        self.call_state.clone(),
1693                        &self.session_id,
1694                        invite_option,
1695                        &option,
1696                        None,
1697                        false,
1698                    )
1699                    .await
1700                {
1701                    Ok(answer) => {
1702                        self.event_sender
1703                            .send(SessionEvent::Answer {
1704                                timestamp: crate::media::get_timestamp(),
1705                                track_id: self.session_id.clone(),
1706                                sdp: answer,
1707                                refer: Some(false),
1708                            })
1709                            .ok();
1710                        return Ok(());
1711                    }
1712                    Err(e) => {
1713                        warn!(
1714                            session_id = self.session_id,
1715                            "failed to create sip track: {}", e
1716                        );
1717                        match &e {
1718                            rsipstack::Error::DialogError(reason, _, code) => {
1719                                self.event_sender
1720                                    .send(SessionEvent::Reject {
1721                                        track_id: self.session_id.clone(),
1722                                        timestamp: crate::media::get_timestamp(),
1723                                        reason: reason.clone(),
1724                                        code: Some(code.code() as u32),
1725                                        refer: Some(false),
1726                                    })
1727                                    .ok();
1728                            }
1729                            _ => {}
1730                        }
1731                        return Err(e.into());
1732                    }
1733                }
1734            }
1735            ActiveCallType::B2bua => {
1736                if let Some(dialog_id) = self
1737                    .invitation
1738                    .find_dialog_id_by_session_id(&self.session_id)
1739                {
1740                    if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1741                        return self
1742                            .prepare_incoming_sip_track(
1743                                self.cancel_token.clone(),
1744                                self.call_state.clone(),
1745                                &self.session_id,
1746                                pending_dialog,
1747                                hangup_headers,
1748                            )
1749                            .await;
1750                    }
1751                }
1752
1753                warn!(
1754                    session_id = self.session_id,
1755                    "no pending dialog found for B2BUA call"
1756                );
1757                return Err(anyhow::anyhow!(
1758                    "no pending dialog found for session_id: {}",
1759                    self.session_id
1760                ));
1761            }
1762        };
1763        match track {
1764            Some(track) => {
1765                self.finish_caller_stack(&option, Some(track)).await?;
1766            }
1767            None => {
1768                warn!(session_id = self.session_id, "no track created for caller");
1769                return Err(anyhow::anyhow!("no track created for caller"));
1770            }
1771        }
1772        Ok(())
1773    }
1774
1775    async fn finish_caller_stack(
1776        &self,
1777        option: &CallOption,
1778        track: Option<Box<dyn Track>>,
1779    ) -> Result<()> {
1780        if let Some(track) = track {
1781            self.setup_track_with_stream(&option, track).await?;
1782        }
1783
1784        {
1785            let call_state = self.call_state.read().await;
1786            if let Some(ref answer) = call_state.answer {
1787                info!(
1788                    session_id = self.session_id,
1789                    "sending answer event: {}", answer,
1790                );
1791                self.event_sender
1792                    .send(SessionEvent::Answer {
1793                        timestamp: crate::media::get_timestamp(),
1794                        track_id: self.session_id.clone(),
1795                        sdp: answer.clone(),
1796                        refer: Some(false),
1797                    })
1798                    .ok();
1799            } else {
1800                warn!(
1801                    session_id = self.session_id,
1802                    "no answer in state to send event"
1803                );
1804            }
1805        }
1806        Ok(())
1807    }
1808
1809    pub async fn setup_track_with_stream(
1810        &self,
1811        option: &CallOption,
1812        mut track: Box<dyn Track>,
1813    ) -> Result<()> {
1814        let processors = match StreamEngine::create_processors(
1815            self.app_state.stream_engine.clone(),
1816            track.as_ref(),
1817            self.cancel_token.child_token(),
1818            self.event_sender.clone(),
1819            self.media_stream.packet_sender.clone(),
1820            option,
1821        )
1822        .await
1823        {
1824            Ok(processors) => processors,
1825            Err(e) => {
1826                warn!(
1827                    session_id = self.session_id,
1828                    "failed to prepare stream processors: {}", e
1829                );
1830                vec![]
1831            }
1832        };
1833
1834        // Add all processors from the hook
1835        for processor in processors {
1836            track.append_processor(processor);
1837        }
1838
1839        self.update_track_wrapper(track, None).await;
1840        Ok(())
1841    }
1842
1843    pub async fn update_track_wrapper(&self, mut track: Box<dyn Track>, play_id: Option<String>) {
1844        let (ambiance_opt, subscribe) = {
1845            let state = self.call_state.read().await;
1846            let mut opt = state
1847                .option
1848                .as_ref()
1849                .and_then(|o| o.ambiance.clone())
1850                .unwrap_or_default();
1851
1852            if let Some(global) = &self.app_state.config.ambiance {
1853                opt.merge(global);
1854            }
1855
1856            let subscribe = state
1857                .option
1858                .as_ref()
1859                .and_then(|o| o.subscribe)
1860                .unwrap_or_default();
1861
1862            (opt, subscribe)
1863        };
1864        if track.id() == &self.server_side_track_id && ambiance_opt.path.is_some() {
1865            match AmbianceProcessor::new(ambiance_opt).await {
1866                Ok(ambiance) => {
1867                    info!(session_id = self.session_id, "loaded ambiance processor");
1868                    track.append_processor(Box::new(ambiance));
1869                }
1870                Err(e) => {
1871                    tracing::error!("failed to load ambiance wav {}", e);
1872                }
1873            }
1874        }
1875
1876        if subscribe && self.call_type != ActiveCallType::WebSocket {
1877            let (track_index, sub_track_id) = if track.id() == &self.server_side_track_id {
1878                (0, self.server_side_track_id.clone())
1879            } else {
1880                (1, self.session_id.clone())
1881            };
1882            let sub_processor =
1883                SubscribeProcessor::new(self.event_sender.clone(), sub_track_id, track_index);
1884            track.append_processor(Box::new(sub_processor));
1885        }
1886
1887        self.call_state.write().await.current_play_id = play_id.clone();
1888        self.media_stream.update_track(track, play_id).await;
1889    }
1890
1891    pub async fn create_websocket_track(
1892        &self,
1893        audio_receiver: WebsocketBytesReceiver,
1894    ) -> Result<Box<dyn Track>> {
1895        let (ssrc, codec) = {
1896            let call_state = self.call_state.read().await;
1897            (
1898                call_state.ssrc,
1899                call_state
1900                    .option
1901                    .as_ref()
1902                    .map(|o| o.codec.clone())
1903                    .unwrap_or_default(),
1904            )
1905        };
1906
1907        let ws_track = WebsocketTrack::new(
1908            self.cancel_token.child_token(),
1909            self.session_id.clone(),
1910            self.track_config.clone(),
1911            self.event_sender.clone(),
1912            audio_receiver,
1913            codec,
1914            ssrc,
1915        );
1916
1917        {
1918            let mut call_state = self.call_state.write().await;
1919            call_state.answer_time = Some(Utc::now());
1920            call_state.answer = Some("".to_string());
1921            call_state.last_status_code = 200;
1922        }
1923
1924        Ok(Box::new(ws_track))
1925    }
1926
1927    pub(super) async fn create_webrtc_track(&self) -> Result<Box<dyn Track>> {
1928        let (ssrc, option) = {
1929            let call_state = self.call_state.read().await;
1930            (
1931                call_state.ssrc,
1932                call_state.option.clone().unwrap_or_default(),
1933            )
1934        };
1935
1936        let mut rtc_config = RtcTrackConfig::default();
1937        rtc_config.mode = rustrtc::TransportMode::WebRtc; // WebRTC
1938        rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
1939
1940        if let Some(codecs) = &self.app_state.config.codecs {
1941            let mut codec_types = Vec::new();
1942            for c in codecs {
1943                match c.to_lowercase().as_str() {
1944                    "pcmu" => codec_types.push(CodecType::PCMU),
1945                    "pcma" => codec_types.push(CodecType::PCMA),
1946                    "g722" => codec_types.push(CodecType::G722),
1947                    "g729" => codec_types.push(CodecType::G729),
1948                    #[cfg(feature = "opus")]
1949                    "opus" => codec_types.push(CodecType::Opus),
1950                    "dtmf" | "2833" | "telephone_event" => {
1951                        codec_types.push(CodecType::TelephoneEvent)
1952                    }
1953                    _ => {}
1954                }
1955            }
1956            if !codec_types.is_empty() {
1957                rtc_config.preferred_codec = Some(codec_types[0].clone());
1958                rtc_config.codecs = codec_types;
1959            }
1960        }
1961
1962        if let Some(ref external_ip) = self.app_state.config.external_ip {
1963            rtc_config.external_ip = Some(external_ip.clone());
1964        }
1965        if let Some(ref bind_ip) = self.app_state.config.rtp_bind_ip {
1966            rtc_config.bind_ip = Some(bind_ip.clone());
1967        }
1968
1969        let mut webrtc_track = RtcTrack::new(
1970            self.cancel_token.child_token(),
1971            self.session_id.clone(),
1972            self.track_config.clone(),
1973            rtc_config,
1974        )
1975        .with_ssrc(ssrc);
1976
1977        let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
1978        let offer = match option.enable_ipv6 {
1979            Some(false) | None => {
1980                strip_ipv6_candidates(option.offer.as_ref().unwrap_or(&"".to_string()))
1981            }
1982            _ => option.offer.clone().unwrap_or("".to_string()),
1983        };
1984        let answer: Option<String>;
1985        match webrtc_track.handshake(offer, timeout).await {
1986            Ok(answer_sdp) => {
1987                answer = match option.enable_ipv6 {
1988                    Some(false) | None => Some(strip_ipv6_candidates(&answer_sdp)),
1989                    Some(true) => Some(answer_sdp.to_string()),
1990                };
1991            }
1992            Err(e) => {
1993                warn!(session_id = self.session_id, "failed to setup track: {}", e);
1994                return Err(anyhow::anyhow!("Failed to setup track: {}", e));
1995            }
1996        }
1997
1998        {
1999            let mut call_state = self.call_state.write().await;
2000            call_state.answer_time = Some(Utc::now());
2001            call_state.answer = answer;
2002            call_state.last_status_code = 200;
2003        }
2004        Ok(Box::new(webrtc_track))
2005    }
2006
2007    async fn create_outgoing_sip_track(
2008        &self,
2009        cancel_token: CancellationToken,
2010        call_state_ref: ActiveCallStateRef,
2011        track_id: &String,
2012        mut invite_option: InviteOption,
2013        call_option: &CallOption,
2014        moh: Option<String>,
2015        auto_hangup: bool,
2016    ) -> Result<String, rsipstack::Error> {
2017        let ssrc = call_state_ref.read().await.ssrc;
2018        let per_call_srtp = call_option.sip.as_ref().and_then(|s| s.enable_srtp);
2019        let rtp_track = self
2020            .create_rtp_track(track_id.clone(), ssrc, per_call_srtp)
2021            .await
2022            .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
2023
2024        let offer = Some(
2025            rtp_track
2026                .local_description()
2027                .await
2028                .map_err(|e| rsipstack::Error::Error(e.to_string()))?,
2029        );
2030
2031        {
2032            let mut cs = call_state_ref.write().await;
2033            if let Some(o) = cs.option.as_mut() {
2034                o.offer = offer.clone();
2035            }
2036            cs.start_time = Utc::now();
2037        };
2038
2039        invite_option.offer = offer.clone().map(|s| s.into());
2040
2041        // Set contact to local SIP endpoint address if not already set explicitly
2042        // Check if contact is still default (no scheme set) or if host is localhost-like
2043        let needs_contact = contact_needs_public_resolution(&invite_option.contact);
2044
2045        if needs_contact {
2046            let addrs = self.invitation.dialog_layer.endpoint.get_addrs();
2047            if let Some(addr) = find_local_addr_for_uri(&addrs, &invite_option.callee) {
2048                let contact_username = invite_option
2049                    .contact
2050                    .auth
2051                    .as_ref()
2052                    .map(|auth| auth.user.as_str())
2053                    .or_else(|| {
2054                        invite_option
2055                            .caller
2056                            .auth
2057                            .as_ref()
2058                            .map(|auth| auth.user.as_str())
2059                    });
2060                invite_option.contact = build_public_contact_uri(
2061                    &self.app_state.learned_public_address,
2062                    self.app_state.auto_learn_public_address_enabled(),
2063                    &addr,
2064                    contact_username,
2065                    Some(&invite_option.contact),
2066                );
2067            } else {
2068                return Err(rsipstack::Error::Error(format!(
2069                    "missing local SIP address for callee transport: {}",
2070                    invite_option.callee
2071                )));
2072            }
2073        }
2074
2075        let mut rtp_track_to_setup = Some(Box::new(rtp_track) as Box<dyn Track>);
2076
2077        if let Some(moh) = moh {
2078            let ssrc_and_moh = {
2079                let mut state = call_state_ref.write().await;
2080                state.moh = Some(moh.clone());
2081                if state.current_play_id.is_none() {
2082                    let ssrc = rand::random::<u32>();
2083                    Some((ssrc, moh.clone()))
2084                } else {
2085                    info!(
2086                        session_id = self.session_id,
2087                        "Something is playing, MOH will start after it ends"
2088                    );
2089                    None
2090                }
2091            };
2092
2093            if let Some((ssrc, moh_path)) = ssrc_and_moh {
2094                let file_track = FileTrack::new(self.server_side_track_id.clone())
2095                    .with_play_id(Some(moh_path.clone()))
2096                    .with_ssrc(ssrc)
2097                    .with_path(moh_path.clone())
2098                    .with_cancel_token(self.cancel_token.child_token());
2099                self.update_track_wrapper(Box::new(file_track), Some(moh_path))
2100                    .await;
2101            }
2102        } else {
2103            let track = rtp_track_to_setup.take().unwrap();
2104            self.setup_track_with_stream(&call_option, track)
2105                .await
2106                .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
2107        }
2108
2109        info!(
2110            session_id = self.session_id,
2111            track_id,
2112            contact = %invite_option.contact,
2113            "invite {} -> {} offer: \n{}",
2114            invite_option.caller,
2115            invite_option.callee,
2116            offer.as_ref().map(|s| s.as_str()).unwrap_or("<NO OFFER>")
2117        );
2118
2119        let (dlg_state_sender, dlg_state_receiver) =
2120            self.invitation.dialog_layer.new_dialog_state_channel();
2121
2122        let states = InviteDialogStates {
2123            is_client: true,
2124            session_id: self.session_id.clone(),
2125            track_id: track_id.clone(),
2126            event_sender: self.event_sender.clone(),
2127            media_stream: self.media_stream.clone(),
2128            call_state: call_state_ref.clone(),
2129            cancel_token,
2130            terminated_reason: None,
2131            has_early_media: false,
2132        };
2133
2134        let hangup_headers = call_option
2135            .sip
2136            .as_ref()
2137            .and_then(|s| s.hangup_headers.as_ref())
2138            .map(|headers_map| {
2139                headers_map
2140                    .iter()
2141                    .map(|(k, v)| rsip::Header::Other(k.clone(), v.clone()))
2142                    .collect::<Vec<rsip::Header>>()
2143            });
2144
2145        let mut client_dialog_handler = DialogStateReceiverGuard::new(
2146            self.invitation.dialog_layer.clone(),
2147            dlg_state_receiver,
2148            hangup_headers,
2149        );
2150
2151        crate::spawn(async move {
2152            client_dialog_handler.process_dialog(states).await;
2153        });
2154
2155        let (dialog_id, answer) = self
2156            .invitation
2157            .invite(invite_option, dlg_state_sender)
2158            .await?;
2159
2160        self.call_state.write().await.moh = None;
2161
2162        if let Some(track) = rtp_track_to_setup {
2163            info!(
2164                session_id = self.session_id,
2165                track_id, "Stopping MOH and setting up RTP track"
2166            );
2167            self.media_stream
2168                .remove_track(&self.server_side_track_id, false)
2169                .await;
2170
2171            self.setup_track_with_stream(&call_option, track)
2172                .await
2173                .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
2174        }
2175
2176        let answer = match answer {
2177            Some(answer) => {
2178                let s = String::from_utf8_lossy(&answer).to_string();
2179                if s.trim().is_empty() {
2180                    // 200 OK had no body — this is valid per RFC 3261 when the answer was
2181                    // already negotiated in a 183 Session Progress (early media).
2182                    // Fall back to the early SDP stored by the Early handler.
2183                    let cs = call_state_ref.read().await;
2184                    match cs.answer.clone() {
2185                        Some(early_sdp) if !early_sdp.is_empty() => {
2186                            info!(
2187                                session_id = self.session_id,
2188                                "200 OK has empty body; using early-media SDP from 183"
2189                            );
2190                            (early_sdp, true /* already applied */)
2191                        }
2192                        _ => {
2193                            warn!(
2194                                session_id = self.session_id,
2195                                "200 OK has empty body and no early-media SDP available"
2196                            );
2197                            (s, false)
2198                        }
2199                    }
2200                } else {
2201                    (s, false)
2202                }
2203            }
2204            None => {
2205                // No answer body at all — check if early media SDP is available before failing
2206                let cs = call_state_ref.read().await;
2207                match cs.answer.clone() {
2208                    Some(early_sdp) if !early_sdp.is_empty() => {
2209                        info!(
2210                            session_id = self.session_id,
2211                            "200 OK had no answer; using early-media SDP from 183"
2212                        );
2213                        (early_sdp, true /* already applied */)
2214                    }
2215                    _ => {
2216                        warn!(session_id = self.session_id, "no answer received");
2217                        return Err(rsipstack::Error::DialogError(
2218                            "No answer received".to_string(),
2219                            dialog_id,
2220                            rsip::StatusCode::NotAcceptableHere,
2221                        ));
2222                    }
2223                }
2224            }
2225        };
2226        let (answer, remote_description_already_applied) = answer;
2227
2228        {
2229            let mut cs = call_state_ref.write().await;
2230            if cs.answer.is_none() {
2231                cs.answer = Some(answer.clone());
2232            }
2233            if auto_hangup {
2234                cs.auto_hangup = Some((ssrc, CallRecordHangupReason::ByRefer));
2235            }
2236        }
2237        if !remote_description_already_applied {
2238            self.media_stream
2239                .update_remote_description(&track_id, &answer)
2240                .await
2241                .ok();
2242        }
2243
2244        Ok(answer)
2245    }
2246
2247    /// Detect if SDP is WebRTC format
2248    pub fn is_webrtc_sdp(sdp: &str) -> bool {
2249        (sdp.contains("a=ice-ufrag:") || sdp.contains("a=ice-pwd:"))
2250            && sdp.contains("a=fingerprint:")
2251    }
2252
2253    pub async fn setup_answer_track(
2254        &self,
2255        ssrc: u32,
2256        option: &CallOption,
2257        offer: String,
2258    ) -> Result<(String, Box<dyn Track>)> {
2259        let offer = match option.enable_ipv6 {
2260            Some(false) | None => strip_ipv6_candidates(&offer),
2261            _ => offer.clone(),
2262        };
2263
2264        let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
2265
2266        let mut media_track = if Self::is_webrtc_sdp(&offer) {
2267            let mut rtc_config = RtcTrackConfig::default();
2268            rtc_config.mode = rustrtc::TransportMode::WebRtc;
2269            rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
2270            if let Some(ref external_ip) = self.app_state.config.external_ip {
2271                rtc_config.external_ip = Some(external_ip.clone());
2272            }
2273            if let Some(ref bind_ip) = self.app_state.config.rtp_bind_ip {
2274                rtc_config.bind_ip = Some(bind_ip.clone());
2275            }
2276            rtc_config.enable_latching = self.app_state.config.enable_rtp_latching;
2277            rtc_config.enable_ice_lite = self.app_state.config.enable_ice_lite;
2278
2279            let webrtc_track = RtcTrack::new(
2280                self.cancel_token.child_token(),
2281                self.session_id.clone(),
2282                self.track_config.clone(),
2283                rtc_config,
2284            )
2285            .with_ssrc(ssrc);
2286
2287            Box::new(webrtc_track) as Box<dyn Track>
2288        } else {
2289            let per_call_srtp = option.sip.as_ref().and_then(|s| s.enable_srtp);
2290            let rtp_track = self
2291                .create_rtp_track(self.session_id.clone(), ssrc, per_call_srtp)
2292                .await?;
2293            Box::new(rtp_track) as Box<dyn Track>
2294        };
2295
2296        let answer = match media_track.handshake(offer.clone(), timeout).await {
2297            Ok(answer) => answer,
2298            Err(e) => {
2299                return Err(anyhow::anyhow!("handshake failed: {e}"));
2300            }
2301        };
2302
2303        return Ok((answer, media_track));
2304    }
2305
2306    pub async fn prepare_incoming_sip_track(
2307        &self,
2308        cancel_token: CancellationToken,
2309        call_state_ref: ActiveCallStateRef,
2310        track_id: &String,
2311        pending_dialog: PendingDialog,
2312        hangup_headers: Option<Vec<rsip::Header>>,
2313    ) -> Result<()> {
2314        let state_receiver = pending_dialog.state_receiver;
2315        //let pending_token_clone = pending_dialog.token;
2316
2317        let states = InviteDialogStates {
2318            is_client: false,
2319            session_id: self.session_id.clone(),
2320            track_id: track_id.clone(),
2321            event_sender: self.event_sender.clone(),
2322            media_stream: self.media_stream.clone(),
2323            call_state: self.call_state.clone(),
2324            cancel_token,
2325            terminated_reason: None,
2326            has_early_media: false,
2327        };
2328
2329        let initial_request = pending_dialog.dialog.initial_request();
2330        let offer = String::from_utf8_lossy(&initial_request.body).to_string();
2331
2332        let (ssrc, option) = {
2333            let call_state = call_state_ref.read().await;
2334            (
2335                call_state.ssrc,
2336                call_state.option.clone().unwrap_or_default(),
2337            )
2338        };
2339
2340        match self.setup_answer_track(ssrc, &option, offer).await {
2341            Ok((offer, track)) => {
2342                self.setup_track_with_stream(&option, track).await?;
2343                {
2344                    let mut state = self.call_state.write().await;
2345                    state.ready_to_answer = Some((offer, None, pending_dialog.dialog));
2346                }
2347            }
2348            Err(e) => {
2349                return Err(anyhow::anyhow!("error creating track: {}", e));
2350            }
2351        }
2352
2353        let mut client_dialog_handler = DialogStateReceiverGuard::new(
2354            self.invitation.dialog_layer.clone(),
2355            state_receiver,
2356            hangup_headers,
2357        );
2358
2359        crate::spawn(async move {
2360            client_dialog_handler.process_dialog(states).await;
2361        });
2362        Ok(())
2363    }
2364}
2365
2366impl Drop for ActiveCall {
2367    fn drop(&mut self) {
2368        info!(session_id = self.session_id, "dropping active call");
2369        if let Some(sender) = self.app_state.callrecord_sender.as_ref() {
2370            if let Some(record) = self.get_callrecord() {
2371                if let Err(e) = sender.send(record) {
2372                    warn!(
2373                        session_id = self.session_id,
2374                        "failed to send call record: {}", e
2375                    );
2376                }
2377            }
2378        }
2379    }
2380}
2381
2382impl ActiveCallState {
2383    pub fn merge_option(&self, mut option: CallOption) -> CallOption {
2384        if let Some(existing) = &self.option {
2385            if option.asr.is_none() {
2386                option.asr = existing.asr.clone();
2387            }
2388            if option.tts.is_none() {
2389                option.tts = existing.tts.clone();
2390            }
2391            if option.vad.is_none() {
2392                option.vad = existing.vad.clone();
2393            }
2394            if option.denoise.is_none() {
2395                option.denoise = existing.denoise;
2396            }
2397            if option.recorder.is_none() {
2398                option.recorder = existing.recorder.clone();
2399            }
2400            if option.eou.is_none() {
2401                option.eou = existing.eou.clone();
2402            }
2403            if option.extra.is_none() {
2404                option.extra = existing.extra.clone();
2405            }
2406            if option.ambiance.is_none() {
2407                option.ambiance = existing.ambiance.clone();
2408            }
2409        }
2410        option
2411    }
2412
2413    pub fn set_hangup_reason(&mut self, reason: CallRecordHangupReason) {
2414        if self.hangup_reason.is_none() {
2415            self.hangup_reason = Some(reason);
2416        }
2417    }
2418
2419    pub fn build_hangup_event(
2420        &self,
2421        track_id: TrackId,
2422        initiator: Option<String>,
2423    ) -> crate::event::SessionEvent {
2424        let from = self.option.as_ref().and_then(|o| o.caller.as_ref());
2425        let to = self.option.as_ref().and_then(|o| o.callee.as_ref());
2426        let extra = self.extras.clone();
2427
2428        crate::event::SessionEvent::Hangup {
2429            track_id,
2430            timestamp: crate::media::get_timestamp(),
2431            reason: Some(format!("{:?}", self.hangup_reason)),
2432            initiator,
2433            start_time: self.start_time.to_rfc3339(),
2434            answer_time: self.answer_time.map(|t| t.to_rfc3339()),
2435            ringing_time: self.ring_time.map(|t| t.to_rfc3339()),
2436            hangup_time: Utc::now().to_rfc3339(),
2437            extra,
2438            from: from.map(|f| f.into()),
2439            to: to.map(|f| f.into()),
2440            refer: Some(self.is_refer),
2441        }
2442    }
2443
2444    pub fn build_callrecord(
2445        &self,
2446        app_state: AppState,
2447        session_id: String,
2448        call_type: ActiveCallType,
2449    ) -> CallRecord {
2450        let option = self.option.clone().unwrap_or_default();
2451        let recorder = if option.recorder.is_some() {
2452            let recorder_file = app_state.get_recorder_file(&session_id);
2453            if std::path::Path::new(&recorder_file).exists() {
2454                let file_size = std::fs::metadata(&recorder_file)
2455                    .map(|m| m.len())
2456                    .unwrap_or(0);
2457                vec![crate::callrecord::CallRecordMedia {
2458                    track_id: session_id.clone(),
2459                    path: recorder_file,
2460                    size: file_size,
2461                    extra: None,
2462                }]
2463            } else {
2464                vec![]
2465            }
2466        } else {
2467            vec![]
2468        };
2469
2470        let dump_event_file = app_state.get_dump_events_file(&session_id);
2471        let dump_event_file = if std::path::Path::new(&dump_event_file).exists() {
2472            Some(dump_event_file)
2473        } else {
2474            None
2475        };
2476
2477        let refer_callrecord = self.refer_callstate.as_ref().and_then(|rc| {
2478            if let Ok(rc) = rc.try_read() {
2479                Some(Box::new(rc.build_callrecord(
2480                    app_state.clone(),
2481                    rc.session_id.clone(),
2482                    ActiveCallType::B2bua,
2483                )))
2484            } else {
2485                None
2486            }
2487        });
2488
2489        let caller = option.caller.clone().unwrap_or_default();
2490        let callee = option.callee.clone().unwrap_or_default();
2491
2492        CallRecord {
2493            option: Some(option),
2494            call_id: session_id,
2495            call_type,
2496            start_time: self.start_time,
2497            ring_time: self.ring_time.clone(),
2498            answer_time: self.answer_time.clone(),
2499            end_time: Utc::now(),
2500            caller,
2501            callee,
2502            hangup_reason: self.hangup_reason.clone(),
2503            hangup_messages: Vec::new(),
2504            status_code: self.last_status_code,
2505            extras: self.extras.clone(),
2506            dump_event_file,
2507            recorder,
2508            refer_callrecord,
2509        }
2510    }
2511}