Skip to main content

active_call/call/
active_call.rs

1use super::Command;
2use crate::{
3    CallOption, ReferOption,
4    event::{EventReceiver, EventSender, SessionEvent},
5    media::{
6        TrackId,
7        ambiance::AmbianceProcessor,
8        engine::StreamEngine,
9        negotiate::strip_ipv6_candidates,
10        processor::SubscribeProcessor,
11        recorder::RecorderOption,
12        stream::{MediaStream, MediaStreamBuilder},
13        track::{
14            Track, TrackConfig,
15            file::FileTrack,
16            media_pass::MediaPassTrack,
17            rtc::{RtcTrack, RtcTrackConfig},
18            tts::SynthesisHandle,
19            websocket::{WebsocketBytesReceiver, WebsocketTrack},
20        },
21    },
22    synthesis::{SynthesisCommand, SynthesisOption},
23    transcription::TranscriptionOption,
24};
25use crate::{
26    app::AppState,
27    call::{
28        CommandReceiver, CommandSender,
29        sip::{DialogStateReceiverGuard, Invitation, InviteDialogStates},
30    },
31    callrecord::{CallRecord, CallRecordEvent, CallRecordEventType, CallRecordHangupReason},
32    useragent::invitation::PendingDialog,
33};
34use anyhow::Result;
35use audio_codec::CodecType;
36use chrono::{DateTime, Utc};
37use rsipstack::dialog::{invitation::InviteOption, server_dialog::ServerInviteDialog};
38use serde::{Deserialize, Serialize};
39use std::{collections::HashMap, path::Path, sync::Arc, time::Duration};
40use tokio::{fs::File, select, sync::Mutex, sync::RwLock, time::sleep};
41use tokio_util::sync::CancellationToken;
42use tracing::{debug, info, warn};
43
44#[cfg(test)]
45mod tests {
46    use super::*;
47    use crate::app::AppStateBuilder;
48    use crate::callrecord::CallRecordHangupReason;
49    use crate::config::Config;
50    use crate::media::track::tts::SynthesisHandle;
51    use crate::synthesis::SynthesisCommand;
52    use tokio::sync::mpsc;
53
54    #[tokio::test]
55    async fn test_tts_ssrc_reuse_for_autohangup() -> Result<()> {
56        let mut config = Config::default();
57        config.udp_port = 0; // Use random port
58        config.media_cache_path = "/tmp/mediacache".to_string();
59        let stream_engine = Arc::new(StreamEngine::default());
60        let app_state = AppStateBuilder::new()
61            .with_config(config)
62            .with_stream_engine(stream_engine)
63            .build()
64            .await?;
65
66        let cancel_token = CancellationToken::new();
67        let session_id = "test-session".to_string();
68        let track_config = TrackConfig::default();
69
70        let mut option = crate::CallOption::default();
71        option.tts = Some(crate::synthesis::SynthesisOption::default());
72
73        let active_call = Arc::new(ActiveCall::new(
74            ActiveCallType::Sip,
75            cancel_token.clone(),
76            session_id.clone(),
77            app_state.invitation.clone(),
78            app_state.clone(),
79            track_config,
80            None,
81            false,
82            None,
83            None,
84            None,
85        ));
86
87        {
88            let mut state = active_call.call_state.write().await;
89            state.option = Some(option);
90        }
91
92        let (tx, mut rx) = mpsc::unbounded_channel::<SynthesisCommand>();
93        let initial_ssrc = 12345;
94        let handle = SynthesisHandle::new(tx, Some("play_1".to_string()), initial_ssrc);
95
96        // 1. Set initial TTS handle
97        {
98            let mut state = active_call.call_state.write().await;
99            state.tts_handle = Some(handle);
100            state.current_play_id = Some("play_1".to_string());
101        }
102
103        // 2. Call do_tts with auto_hangup=true and same play_id
104        active_call
105            .do_tts(
106                "hangup now".to_string(),
107                None,
108                Some("play_1".to_string()),
109                Some(true),
110                false,
111                true,
112                None,
113                None,
114                false,
115                None,
116            )
117            .await?;
118
119        // 3. Verify auto_hangup state uses the EXISTING SSRC
120        {
121            let state = active_call.call_state.read().await;
122            assert!(state.auto_hangup.is_some());
123            let (h_ssrc, reason) = state.auto_hangup.clone().unwrap();
124            assert_eq!(
125                h_ssrc, initial_ssrc,
126                "SSRC should be reused from existing handle"
127            );
128            assert_eq!(reason, CallRecordHangupReason::BySystem);
129        }
130
131        // 4. Verify command was sent to existing channel
132        let cmd = rx.try_recv().expect("Should have received tts command");
133        assert_eq!(cmd.text, "hangup now");
134
135        Ok(())
136    }
137
138    #[tokio::test]
139    async fn test_tts_new_ssrc_for_different_play_id() -> Result<()> {
140        let mut config = Config::default();
141        config.udp_port = 0; // Use random port
142        config.media_cache_path = "/tmp/mediacache".to_string();
143        let stream_engine = Arc::new(StreamEngine::default());
144        let app_state = AppStateBuilder::new()
145            .with_config(config)
146            .with_stream_engine(stream_engine)
147            .build()
148            .await?;
149
150        let active_call = Arc::new(ActiveCall::new(
151            ActiveCallType::Sip,
152            CancellationToken::new(),
153            "test-session".to_string(),
154            app_state.invitation.clone(),
155            app_state.clone(),
156            TrackConfig::default(),
157            None,
158            false,
159            None,
160            None,
161            None,
162        ));
163
164        let mut tts_opt = crate::synthesis::SynthesisOption::default();
165        tts_opt.provider = Some(crate::synthesis::SynthesisType::Aliyun);
166        let mut option = crate::CallOption::default();
167        option.tts = Some(tts_opt);
168        {
169            let mut state = active_call.call_state.write().await;
170            state.option = Some(option);
171        }
172
173        let (tx, _rx) = mpsc::unbounded_channel();
174        let initial_ssrc = 111;
175        let handle = SynthesisHandle::new(tx, Some("play_1".to_string()), initial_ssrc);
176
177        {
178            let mut state = active_call.call_state.write().await;
179            state.tts_handle = Some(handle);
180            state.current_play_id = Some("play_1".to_string());
181        }
182
183        // Call do_tts with DIFFERENT play_id
184        active_call
185            .do_tts(
186                "new play".to_string(),
187                None,
188                Some("play_2".to_string()),
189                Some(true),
190                false,
191                true,
192                None,
193                None,
194                false,
195                None,
196            )
197            .await?;
198
199        // Verify auto_hangup uses a NEW SSRC (because it should interrupt and start fresh)
200        {
201            let state = active_call.call_state.read().await;
202            let (h_ssrc, _) = state.auto_hangup.clone().unwrap();
203            assert_ne!(
204                h_ssrc, initial_ssrc,
205                "Should use a new SSRC for different play_id"
206            );
207        }
208
209        Ok(())
210    }
211}
212
213#[derive(Deserialize)]
214#[serde(rename_all = "camelCase")]
215pub struct CallParams {
216    pub id: Option<String>,
217    #[serde(rename = "dump")]
218    pub dump_events: Option<bool>,
219    #[serde(rename = "ping")]
220    pub ping_interval: Option<u32>,
221    pub server_side_track: Option<String>,
222}
223
224#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
225#[serde(rename_all = "camelCase")]
226pub enum ActiveCallType {
227    Webrtc,
228    B2bua,
229    WebSocket,
230    #[default]
231    Sip,
232}
233
234#[derive(Default)]
235pub struct ActiveCallState {
236    pub session_id: String,
237    pub start_time: DateTime<Utc>,
238    pub ring_time: Option<DateTime<Utc>>,
239    pub answer_time: Option<DateTime<Utc>>,
240    pub hangup_reason: Option<CallRecordHangupReason>,
241    pub last_status_code: u16,
242    pub option: Option<CallOption>,
243    pub answer: Option<String>,
244    pub ssrc: u32,
245    pub refer_callstate: Option<ActiveCallStateRef>,
246    pub extras: Option<HashMap<String, serde_json::Value>>,
247    pub is_refer: bool,
248    pub sip_hangup_headers_template: Option<HashMap<String, String>>,
249
250    // Runtime state (migrated from ActiveCall to reduce multiple locks)
251    pub tts_handle: Option<SynthesisHandle>,
252    pub auto_hangup: Option<(u32, CallRecordHangupReason)>,
253    pub wait_input_timeout: Option<u32>,
254    pub moh: Option<String>,
255    pub current_play_id: Option<String>,
256    pub audio_receiver: Option<WebsocketBytesReceiver>,
257    pub ready_to_answer: Option<(String, Option<Box<dyn Track>>, ServerInviteDialog)>,
258    pub pending_asr_resume: Option<(u32, TranscriptionOption)>,
259}
260
261pub type ActiveCallRef = Arc<ActiveCall>;
262pub type ActiveCallStateRef = Arc<RwLock<ActiveCallState>>;
263
264pub struct ActiveCall {
265    pub call_state: ActiveCallStateRef,
266    pub cancel_token: CancellationToken,
267    pub call_type: ActiveCallType,
268    pub session_id: String,
269    pub media_stream: Arc<MediaStream>,
270    pub track_config: TrackConfig,
271    pub event_sender: EventSender,
272    pub app_state: AppState,
273    pub invitation: Invitation,
274    pub cmd_sender: CommandSender,
275    pub dump_events: bool,
276    pub server_side_track_id: TrackId,
277}
278
279pub struct ActiveCallGuard {
280    pub call: ActiveCallRef,
281    pub active_calls: usize,
282}
283
284impl ActiveCallGuard {
285    pub fn new(call: ActiveCallRef) -> Self {
286        let active_calls = {
287            call.app_state
288                .total_calls
289                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
290            let mut calls = call.app_state.active_calls.lock().unwrap();
291            calls.insert(call.session_id.clone(), call.clone());
292            calls.len()
293        };
294        Self { call, active_calls }
295    }
296}
297
298impl Drop for ActiveCallGuard {
299    fn drop(&mut self) {
300        self.call
301            .app_state
302            .active_calls
303            .lock()
304            .unwrap()
305            .remove(&self.call.session_id);
306    }
307}
308
309pub struct ActiveCallReceiver {
310    pub cmd_receiver: CommandReceiver,
311    pub dump_cmd_receiver: CommandReceiver,
312    pub dump_event_receiver: EventReceiver,
313}
314
315impl ActiveCall {
316    pub fn new(
317        call_type: ActiveCallType,
318        cancel_token: CancellationToken,
319        session_id: String,
320        invitation: Invitation,
321        app_state: AppState,
322        track_config: TrackConfig,
323        audio_receiver: Option<WebsocketBytesReceiver>,
324        dump_events: bool,
325        server_side_track_id: Option<TrackId>,
326        extras: Option<HashMap<String, serde_json::Value>>,
327        sip_hangup_headers_template: Option<HashMap<String, String>>,
328    ) -> Self {
329        let event_sender = crate::event::create_event_sender();
330        let cmd_sender = tokio::sync::broadcast::Sender::<Command>::new(32);
331        let media_stream_builder = MediaStreamBuilder::new(event_sender.clone())
332            .with_id(session_id.clone())
333            .with_cancel_token(cancel_token.child_token());
334        let media_stream = Arc::new(media_stream_builder.build());
335        let start_time = Utc::now();
336        // Inject built-in session variables into extras
337        let call_type_str = match &call_type {
338            ActiveCallType::Sip => "sip",
339            ActiveCallType::WebSocket => "websocket",
340            ActiveCallType::Webrtc => "webrtc",
341            ActiveCallType::B2bua => "b2bua",
342        };
343        let extras = {
344            let mut e = extras.unwrap_or_default();
345            e.entry(crate::playbook::BUILTIN_SESSION_ID.to_string())
346                .or_insert_with(|| serde_json::Value::String(session_id.clone()));
347            e.entry(crate::playbook::BUILTIN_CALL_TYPE.to_string())
348                .or_insert_with(|| serde_json::Value::String(call_type_str.to_string()));
349            e.entry(crate::playbook::BUILTIN_START_TIME.to_string())
350                .or_insert_with(|| serde_json::Value::String(start_time.to_rfc3339()));
351            Some(e)
352        };
353        let call_state = Arc::new(RwLock::new(ActiveCallState {
354            session_id: session_id.clone(),
355            start_time,
356            ssrc: rand::random::<u32>(),
357            extras,
358            audio_receiver,
359            sip_hangup_headers_template,
360            ..Default::default()
361        }));
362        Self {
363            cancel_token,
364            call_type,
365            session_id,
366            call_state,
367            media_stream,
368            track_config,
369            event_sender,
370            app_state,
371            invitation,
372            cmd_sender,
373            dump_events,
374            server_side_track_id: server_side_track_id.unwrap_or("server-side-track".to_string()),
375        }
376    }
377
378    pub async fn enqueue_command(&self, command: Command) -> Result<()> {
379        self.cmd_sender
380            .send(command)
381            .map_err(|e| anyhow::anyhow!("Failed to send command: {}", e))?;
382        Ok(())
383    }
384
385    /// Create a new ActiveCallReceiver for this ActiveCall
386    /// `tokio::sync::broadcast` not cached messages, so need to early create receiver
387    /// before calling `serve()`
388    pub fn new_receiver(&self) -> ActiveCallReceiver {
389        ActiveCallReceiver {
390            cmd_receiver: self.cmd_sender.subscribe(),
391            dump_cmd_receiver: self.cmd_sender.subscribe(),
392            dump_event_receiver: self.event_sender.subscribe(),
393        }
394    }
395
396    pub async fn serve(&self, receiver: ActiveCallReceiver) -> Result<()> {
397        let ActiveCallReceiver {
398            mut cmd_receiver,
399            dump_cmd_receiver,
400            dump_event_receiver,
401        } = receiver;
402
403        let process_command_loop = async move {
404            while let Ok(command) = cmd_receiver.recv().await {
405                match self.dispatch(command).await {
406                    Ok(_) => (),
407                    Err(e) => {
408                        warn!(session_id = self.session_id, "{}", e);
409                        self.event_sender
410                            .send(SessionEvent::Error {
411                                track_id: self.session_id.clone(),
412                                timestamp: crate::media::get_timestamp(),
413                                sender: "command".to_string(),
414                                error: e.to_string(),
415                                code: None,
416                            })
417                            .ok();
418                    }
419                }
420            }
421        };
422        self.app_state
423            .total_calls
424            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
425
426        tokio::join!(
427            self.dump_loop(self.dump_events, dump_cmd_receiver, dump_event_receiver),
428            async {
429                select! {
430                    _ = process_command_loop => {
431                        info!(session_id = self.session_id, "command loop done");
432                    }
433                    _ = self.process() => {
434                        info!(session_id = self.session_id, "call serve done");
435                    }
436                    _ = self.cancel_token.cancelled() => {
437                        info!(session_id = self.session_id, "call cancelled - cleaning up resources");
438                    }
439                }
440                self.cancel_token.cancel();
441            }
442        );
443        Ok(())
444    }
445
446    async fn process(&self) -> Result<()> {
447        let mut event_receiver = self.event_sender.subscribe();
448
449        let input_timeout_expire = Arc::new(Mutex::new((0u64, 0u32)));
450        let input_timeout_expire_ref = input_timeout_expire.clone();
451        let event_sender = self.event_sender.clone();
452        let wait_input_timeout_loop = async {
453            loop {
454                let (start_time, expire) = { *input_timeout_expire.lock().await };
455                if expire > 0 && crate::media::get_timestamp() >= start_time + expire as u64 {
456                    info!(session_id = self.session_id, "wait input timeout reached");
457                    *input_timeout_expire.lock().await = (0, 0);
458                    event_sender
459                        .send(SessionEvent::Silence {
460                            track_id: self.server_side_track_id.clone(),
461                            timestamp: crate::media::get_timestamp(),
462                            start_time,
463                            duration: expire as u64,
464                            samples: None,
465                        })
466                        .ok();
467                }
468                sleep(Duration::from_millis(100)).await;
469            }
470        };
471        let server_side_track_id = self.server_side_track_id.clone();
472        let event_hook_loop = async move {
473            while let Ok(event) = event_receiver.recv().await {
474                match event {
475                    SessionEvent::Speaking { .. }
476                    | SessionEvent::Dtmf { .. }
477                    | SessionEvent::AsrDelta { .. }
478                    | SessionEvent::AsrFinal { .. }
479                    | SessionEvent::TrackStart { .. } => {
480                        *input_timeout_expire_ref.lock().await = (0, 0);
481                    }
482                    SessionEvent::TrackEnd {
483                        track_id,
484                        play_id,
485                        ssrc,
486                        ..
487                    } => {
488                        if track_id != server_side_track_id {
489                            continue;
490                        }
491
492                        let (moh_path, auto_hangup, wait_timeout_val) = {
493                            let mut state = self.call_state.write().await;
494                            if play_id != state.current_play_id {
495                                debug!(
496                                    session_id = self.session_id,
497                                    ?play_id,
498                                    current = ?state.current_play_id,
499                                    "ignoring interrupted track end"
500                                );
501                                continue;
502                            }
503                            state.current_play_id = None;
504                            (
505                                state.moh.clone(),
506                                state.auto_hangup.clone(),
507                                state.wait_input_timeout.take(),
508                            )
509                        };
510
511                        if let Some(path) = moh_path {
512                            info!(session_id = self.session_id, "looping moh: {}", path);
513                            let ssrc = rand::random::<u32>();
514                            let file_track = FileTrack::new(self.server_side_track_id.clone())
515                                .with_play_id(Some(path.clone()))
516                                .with_ssrc(ssrc)
517                                .with_path(path.clone())
518                                .with_cancel_token(self.cancel_token.child_token());
519                            self.update_track_wrapper(Box::new(file_track), Some(path))
520                                .await;
521                            continue;
522                        }
523
524                        if let Some((hangup_ssrc, hangup_reason)) = auto_hangup {
525                            if hangup_ssrc == ssrc {
526                                info!(
527                                    session_id = self.session_id,
528                                    ssrc, "auto hangup when track end track_id:{}", track_id
529                                );
530                                self.do_hangup(Some(hangup_reason), None, None).await.ok();
531                            }
532                        }
533
534                        if let Some(timeout) = wait_timeout_val {
535                            let expire = if timeout > 0 {
536                                (crate::media::get_timestamp(), timeout)
537                            } else {
538                                (0, 0)
539                            };
540                            *input_timeout_expire_ref.lock().await = expire;
541                        }
542                    }
543                    SessionEvent::Interrupt { receiver } => {
544                        let track_id =
545                            receiver.unwrap_or_else(|| self.server_side_track_id.clone());
546                        if track_id == self.server_side_track_id {
547                            debug!(
548                                session_id = self.session_id,
549                                "received interrupt event, stopping playback"
550                            );
551                            self.do_interrupt(true).await.ok();
552                        }
553                    }
554                    SessionEvent::Inactivity { track_id, .. } => {
555                        info!(
556                            session_id = self.session_id,
557                            track_id, "inactivity timeout reached, hanging up"
558                        );
559                        self.do_hangup(Some(CallRecordHangupReason::InactivityTimeout), None, None)
560                            .await
561                            .ok();
562                    }
563                    SessionEvent::Hangup { refer, .. } => {
564                        // Check if we need to resume ASR after refer hangup
565                        if refer == Some(true) {
566                            let mut cs = self.call_state.write().await;
567                            if let Some((refer_ssrc, asr_option)) = cs.pending_asr_resume.take() {
568                                // Verify it's the refer call that ended
569                                let is_refer_hangup = cs
570                                    .refer_callstate
571                                    .as_ref()
572                                    .map(|rcs| {
573                                        rcs.try_read()
574                                            .map(|g| g.ssrc == refer_ssrc)
575                                            .unwrap_or(false)
576                                    })
577                                    .unwrap_or(false);
578
579                                if is_refer_hangup {
580                                    drop(cs); // Release lock before async operations
581                                    info!(
582                                        session_id = self.session_id,
583                                        "Refer call ended, resuming parent ASR"
584                                    );
585
586                                    // Resume ASR
587                                    match self
588                                        .app_state
589                                        .stream_engine
590                                        .create_asr_processor(
591                                            self.server_side_track_id.clone(),
592                                            self.cancel_token.child_token(),
593                                            asr_option,
594                                            self.event_sender.clone(),
595                                        )
596                                        .await
597                                    {
598                                        Ok(asr_processor) => {
599                                            if let Err(e) = self
600                                                .media_stream
601                                                .append_processor(
602                                                    &self.server_side_track_id,
603                                                    asr_processor,
604                                                )
605                                                .await
606                                            {
607                                                warn!(
608                                                    session_id = self.session_id,
609                                                    "Failed to resume ASR after refer: {}", e
610                                                );
611                                            }
612                                        }
613                                        Err(e) => {
614                                            warn!(
615                                                session_id = self.session_id,
616                                                "Failed to create ASR processor for resume: {}", e
617                                            );
618                                        }
619                                    }
620                                }
621                            }
622                        }
623                    }
624                    SessionEvent::Error { track_id, .. } => {
625                        if track_id != server_side_track_id {
626                            continue;
627                        }
628
629                        let moh_info = {
630                            let mut state = self.call_state.write().await;
631                            if let Some(path) = state.moh.clone() {
632                                let fallback = "./config/sounds/refer_moh.wav".to_string();
633                                let next_path = if path != fallback
634                                    && std::path::Path::new(&fallback).exists()
635                                {
636                                    info!(
637                                        session_id = self.session_id,
638                                        "moh error, switching to fallback: {}", fallback
639                                    );
640                                    state.moh = Some(fallback.clone());
641                                    fallback
642                                } else {
643                                    info!(
644                                        session_id = self.session_id,
645                                        "looping moh on error: {}", path
646                                    );
647                                    path
648                                };
649                                Some(next_path)
650                            } else {
651                                None
652                            }
653                        };
654
655                        if let Some(next_path) = moh_info {
656                            let ssrc = rand::random::<u32>();
657                            let file_track = FileTrack::new(self.server_side_track_id.clone())
658                                .with_play_id(Some(next_path.clone()))
659                                .with_ssrc(ssrc)
660                                .with_path(next_path.clone())
661                                .with_cancel_token(self.cancel_token.child_token());
662                            self.update_track_wrapper(Box::new(file_track), Some(next_path))
663                                .await;
664                            continue;
665                        }
666                    }
667                    _ => {}
668                }
669            }
670        };
671
672        select! {
673            _ = wait_input_timeout_loop=>{
674                info!(session_id = self.session_id, "wait input timeout loop done");
675            }
676            _ = self.media_stream.serve() => {
677                info!(session_id = self.session_id, "media stream loop done");
678            }
679            _ = event_hook_loop => {
680                info!(session_id = self.session_id, "event loop done");
681            }
682        }
683        Ok(())
684    }
685
686    async fn dispatch(&self, command: Command) -> Result<()> {
687        match command {
688            Command::Invite { option } => self.do_invite(option).await,
689            Command::Accept { option } => self.do_accept(option).await,
690            Command::Reject { reason, code } => {
691                self.do_reject(code.map(|c| (c as u16).into()), Some(reason))
692                    .await
693            }
694            Command::Ringing {
695                ringtone,
696                recorder,
697                early_media,
698            } => self.do_ringing(ringtone, recorder, early_media).await,
699            Command::Tts {
700                text,
701                speaker,
702                play_id,
703                auto_hangup,
704                streaming,
705                end_of_stream,
706                option,
707                wait_input_timeout,
708                base64,
709                cache_key,
710            } => {
711                self.do_tts(
712                    text,
713                    speaker,
714                    play_id,
715                    auto_hangup,
716                    streaming.unwrap_or_default(),
717                    end_of_stream.unwrap_or_default(),
718                    option,
719                    wait_input_timeout,
720                    base64.unwrap_or_default(),
721                    cache_key,
722                )
723                .await
724            }
725            Command::Play {
726                url,
727                play_id,
728                auto_hangup,
729                wait_input_timeout,
730            } => {
731                self.do_play(url, play_id, auto_hangup, wait_input_timeout)
732                    .await
733            }
734            Command::Hangup {
735                reason,
736                initiator,
737                headers,
738            } => {
739                let reason = reason.map(|r| {
740                    r.parse::<CallRecordHangupReason>()
741                        .unwrap_or(CallRecordHangupReason::BySystem)
742                });
743                self.do_hangup(reason, initiator, headers).await
744            }
745            Command::Refer {
746                caller,
747                callee,
748                options,
749            } => self.do_refer(caller, callee, options).await,
750            Command::Mute { track_id } => self.do_mute(track_id).await,
751            Command::Unmute { track_id } => self.do_unmute(track_id).await,
752            Command::Pause {} => self.do_pause().await,
753            Command::Resume {} => self.do_resume().await,
754            Command::Interrupt {
755                graceful: passage,
756                fade_out_ms: _,
757            } => self.do_interrupt(passage.unwrap_or_default()).await,
758            Command::History { speaker, text } => self.do_history(speaker, text).await,
759        }
760    }
761
762    fn build_record_option(&self, option: &CallOption) -> Option<RecorderOption> {
763        if let Some(recorder_option) = &option.recorder {
764            let mut recorder_file = recorder_option.recorder_file.clone();
765            if recorder_file.contains("{id}") {
766                recorder_file = recorder_file.replace("{id}", &self.session_id);
767            }
768
769            let recorder_file = if recorder_file.is_empty() {
770                self.app_state.get_recorder_file(&self.session_id)
771            } else {
772                let p = Path::new(&recorder_file);
773                p.is_absolute()
774                    .then(|| recorder_file.clone())
775                    .unwrap_or_else(|| self.app_state.get_recorder_file(&recorder_file))
776            };
777            info!(
778                session_id = self.session_id,
779                recorder_file, "created recording file"
780            );
781
782            let track_samplerate = self.track_config.samplerate;
783            let recorder_samplerate = if track_samplerate > 0 {
784                track_samplerate
785            } else {
786                recorder_option.samplerate
787            };
788            let recorder_ptime = if recorder_option.ptime == 0 {
789                200
790            } else {
791                recorder_option.ptime
792            };
793            let requested_format = recorder_option
794                .format
795                .unwrap_or(self.app_state.config.recorder_format());
796            let format = requested_format.effective();
797            if requested_format != format {
798                warn!(
799                    session_id = self.session_id,
800                    requested = requested_format.extension(),
801                    "Recorder format fallback to wav due to unsupported feature"
802                );
803            }
804            let mut recorder_config = RecorderOption {
805                recorder_file,
806                samplerate: recorder_samplerate,
807                ptime: recorder_ptime,
808                format: Some(format),
809            };
810            recorder_config.ensure_path_extension(format);
811            Some(recorder_config)
812        } else {
813            None
814        }
815    }
816
817    async fn invite_or_accept(&self, mut option: CallOption, sender: String) -> Result<CallOption> {
818        // Merge with existing configuration (e.g., from playbook)
819        {
820            let state = self.call_state.read().await;
821            option = state.merge_option(option);
822        }
823
824        option.check_default();
825        if let Some(opt) = self.build_record_option(&option) {
826            self.media_stream.update_recorder_option(opt).await;
827        }
828
829        if let Some(opt) = &option.media_pass {
830            let track_id = self.server_side_track_id.clone();
831            let cancel_token = self.cancel_token.child_token();
832            let ssrc = rand::random::<u32>();
833            let media_pass_track = MediaPassTrack::new(
834                self.session_id.clone(),
835                ssrc,
836                track_id,
837                cancel_token,
838                opt.clone(),
839            );
840            self.update_track_wrapper(Box::new(media_pass_track), None)
841                .await;
842        }
843
844        info!(
845            session_id = self.session_id,
846            call_type = ?self.call_type,
847            sender,
848            ?option,
849            "caller with option"
850        );
851
852        match self.setup_caller_track(&option).await {
853            Ok(_) => return Ok(option),
854            Err(e) => {
855                self.app_state
856                    .total_failed_calls
857                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
858                let error_event = crate::event::SessionEvent::Error {
859                    track_id: self.session_id.clone(),
860                    timestamp: crate::media::get_timestamp(),
861                    sender,
862                    error: e.to_string(),
863                    code: None,
864                };
865                self.event_sender.send(error_event).ok();
866                self.do_hangup(Some(CallRecordHangupReason::BySystem), None, None)
867                    .await
868                    .ok();
869                return Err(e);
870            }
871        }
872    }
873
874    async fn do_invite(&self, option: CallOption) -> Result<()> {
875        self.invite_or_accept(option, "invite".to_string())
876            .await
877            .map(|_| ())
878    }
879
880    async fn do_accept(&self, mut option: CallOption) -> Result<()> {
881        let has_pending = self
882            .invitation
883            .find_dialog_id_by_session_id(&self.session_id)
884            .is_some();
885        let ready_to_answer_val = {
886            let state = self.call_state.read().await;
887            state.ready_to_answer.is_none()
888        };
889
890        if ready_to_answer_val {
891            if !has_pending {
892                // emit reject event
893                warn!(session_id = self.session_id, "no pending call to accept");
894                let rejet_event = crate::event::SessionEvent::Reject {
895                    track_id: self.session_id.clone(),
896                    timestamp: crate::media::get_timestamp(),
897                    reason: "no pending call".to_string(),
898                    refer: None,
899                    code: Some(486),
900                };
901                self.event_sender.send(rejet_event).ok();
902                self.do_hangup(Some(CallRecordHangupReason::BySystem), None, None)
903                    .await
904                    .ok();
905                return Err(anyhow::anyhow!("no pending call to accept"));
906            }
907            option = self.invite_or_accept(option, "accept".to_string()).await?;
908        } else {
909            option.check_default();
910            self.call_state.write().await.option = Some(option.clone());
911        }
912        info!(session_id = self.session_id, ?option, "accepting call");
913        let ready = self.call_state.write().await.ready_to_answer.take();
914        if let Some((answer, track, dialog)) = ready {
915            info!(
916                session_id = self.session_id,
917                track_id = track.as_ref().map(|t| t.id()),
918                "ready to answer with track"
919            );
920
921            let headers = vec![rsip::Header::ContentType(
922                "application/sdp".to_string().into(),
923            )];
924
925            match dialog.accept(Some(headers), Some(answer.as_bytes().to_vec())) {
926                Ok(_) => {
927                    {
928                        let mut state = self.call_state.write().await;
929                        state.answer = Some(answer);
930                        state.answer_time = Some(Utc::now());
931                    }
932                    self.finish_caller_stack(&option, track).await?;
933                }
934                Err(e) => {
935                    warn!(session_id = self.session_id, "failed to accept call: {}", e);
936                    return Err(anyhow::anyhow!("failed to accept call"));
937                }
938            }
939        }
940        return Ok(());
941    }
942
943    async fn do_reject(
944        &self,
945        code: Option<rsip::StatusCode>,
946        reason: Option<String>,
947    ) -> Result<()> {
948        match self
949            .invitation
950            .find_dialog_id_by_session_id(&self.session_id)
951        {
952            Some(id) => {
953                info!(
954                    session_id = self.session_id,
955                    ?reason,
956                    ?code,
957                    "rejecting call"
958                );
959                self.invitation.hangup(id, code, reason).await
960            }
961            None => Ok(()),
962        }
963    }
964
965    async fn do_ringing(
966        &self,
967        ringtone: Option<String>,
968        recorder: Option<RecorderOption>,
969        early_media: Option<bool>,
970    ) -> Result<()> {
971        let ready_to_answer_val = self.call_state.read().await.ready_to_answer.is_none();
972        if ready_to_answer_val {
973            let option = CallOption {
974                recorder,
975                ..Default::default()
976            };
977            let _ = self.invite_or_accept(option, "ringing".to_string()).await?;
978        }
979
980        let state = self.call_state.read().await;
981        if let Some((answer, _, dialog)) = state.ready_to_answer.as_ref() {
982            let (headers, body) = if early_media.unwrap_or_default() || ringtone.is_some() {
983                let headers = vec![rsip::Header::ContentType(
984                    "application/sdp".to_string().into(),
985                )];
986                (Some(headers), Some(answer.as_bytes().to_vec()))
987            } else {
988                (None, None)
989            };
990
991            dialog.ringing(headers, body).ok();
992            info!(
993                session_id = self.session_id,
994                ringtone, early_media, "playing ringtone"
995            );
996            if let Some(ringtone_url) = ringtone {
997                drop(state);
998                self.do_play(ringtone_url, None, None, None).await.ok();
999            } else {
1000                info!(session_id = self.session_id, "no ringtone to play");
1001            }
1002        }
1003        Ok(())
1004    }
1005
1006    async fn do_tts(
1007        &self,
1008        text: String,
1009        speaker: Option<String>,
1010        play_id: Option<String>,
1011        auto_hangup: Option<bool>,
1012        streaming: bool,
1013        end_of_stream: bool,
1014        option: Option<SynthesisOption>,
1015        wait_input_timeout: Option<u32>,
1016        base64: bool,
1017        cache_key: Option<String>,
1018    ) -> Result<()> {
1019        let tts_option = {
1020            let call_state = self.call_state.read().await;
1021            match call_state.option.clone().unwrap_or_default().tts {
1022                Some(opt) => opt.merge_with(option),
1023                None => {
1024                    if let Some(opt) = option {
1025                        opt
1026                    } else {
1027                        return Err(anyhow::anyhow!("no tts option available"));
1028                    }
1029                }
1030            }
1031        };
1032        let speaker = match speaker {
1033            Some(s) => Some(s),
1034            None => tts_option.speaker.clone(),
1035        };
1036
1037        let mut play_command = SynthesisCommand {
1038            text,
1039            speaker,
1040            play_id: play_id.clone(),
1041            streaming,
1042            end_of_stream,
1043            option: tts_option,
1044            base64,
1045            cache_key,
1046        };
1047        info!(
1048            session_id = self.session_id,
1049            provider = ?play_command.option.provider,
1050            text = %play_command.text.chars().take(10).collect::<String>(),
1051            speaker = play_command.speaker.as_deref(),
1052            auto_hangup = auto_hangup.unwrap_or_default(),
1053            play_id = play_command.play_id.as_deref(),
1054            streaming = play_command.streaming,
1055            end_of_stream = play_command.end_of_stream,
1056            wait_input_timeout = wait_input_timeout.unwrap_or_default(),
1057            is_base64 = play_command.base64,
1058            cache_key = play_command.cache_key.as_deref(),
1059            "new synthesis"
1060        );
1061
1062        let ssrc = rand::random::<u32>();
1063        let (should_interrupt, picked_ssrc) = {
1064            let mut state = self.call_state.write().await;
1065
1066            let (target_ssrc, changed) = if let Some(handle) = &state.tts_handle {
1067                if play_id.is_some() && state.current_play_id != play_id {
1068                    (ssrc, true)
1069                } else {
1070                    (handle.ssrc, false)
1071                }
1072            } else {
1073                (ssrc, false)
1074            };
1075
1076            state.auto_hangup = match auto_hangup {
1077                Some(true) => Some((target_ssrc, CallRecordHangupReason::BySystem)),
1078                _ => state.auto_hangup.clone(),
1079            };
1080            state.wait_input_timeout = wait_input_timeout;
1081
1082            state.current_play_id = play_id.clone();
1083            (changed, target_ssrc)
1084        };
1085
1086        if should_interrupt {
1087            let _ = self.do_interrupt(false).await;
1088        }
1089
1090        let existing_handle = self.call_state.read().await.tts_handle.clone();
1091        if let Some(tts_handle) = existing_handle {
1092            match tts_handle.try_send(play_command) {
1093                Ok(_) => return Ok(()),
1094                Err(e) => {
1095                    play_command = e.0;
1096                }
1097            }
1098        }
1099
1100        let (new_handle, tts_track) = StreamEngine::create_tts_track(
1101            self.app_state.stream_engine.clone(),
1102            self.cancel_token.child_token(),
1103            self.session_id.clone(),
1104            self.server_side_track_id.clone(),
1105            picked_ssrc,
1106            play_id.clone(),
1107            streaming,
1108            &play_command.option,
1109        )
1110        .await?;
1111
1112        new_handle.try_send(play_command)?;
1113        self.call_state.write().await.tts_handle = Some(new_handle);
1114        self.update_track_wrapper(tts_track, play_id).await;
1115        Ok(())
1116    }
1117
1118    async fn do_play(
1119        &self,
1120        url: String,
1121        play_id: Option<String>,
1122        auto_hangup: Option<bool>,
1123        wait_input_timeout: Option<u32>,
1124    ) -> Result<()> {
1125        let ssrc = rand::random::<u32>();
1126        info!(
1127            session_id = self.session_id,
1128            ssrc, url, play_id, auto_hangup, "play file track"
1129        );
1130
1131        let play_id = play_id.or(Some(url.clone()));
1132
1133        let file_track = FileTrack::new(self.server_side_track_id.clone())
1134            .with_play_id(play_id.clone())
1135            .with_ssrc(ssrc)
1136            .with_path(url)
1137            .with_cancel_token(self.cancel_token.child_token());
1138
1139        {
1140            let mut state = self.call_state.write().await;
1141            state.tts_handle = None;
1142            state.auto_hangup = match auto_hangup {
1143                Some(true) => Some((ssrc, CallRecordHangupReason::BySystem)),
1144                _ => None,
1145            };
1146            state.wait_input_timeout = wait_input_timeout;
1147        }
1148
1149        self.update_track_wrapper(Box::new(file_track), play_id)
1150            .await;
1151        Ok(())
1152    }
1153
1154    async fn do_history(&self, speaker: String, text: String) -> Result<()> {
1155        self.event_sender
1156            .send(SessionEvent::AddHistory {
1157                sender: Some(self.session_id.clone()),
1158                timestamp: crate::media::get_timestamp(),
1159                speaker,
1160                text,
1161            })
1162            .map(|_| ())
1163            .map_err(Into::into)
1164    }
1165
1166    async fn do_interrupt(&self, graceful: bool) -> Result<()> {
1167        {
1168            let mut state = self.call_state.write().await;
1169            state.tts_handle = None;
1170            state.moh = None;
1171        }
1172        self.media_stream
1173            .remove_track(&self.server_side_track_id, graceful)
1174            .await;
1175        Ok(())
1176    }
1177    async fn do_pause(&self) -> Result<()> {
1178        Ok(())
1179    }
1180    async fn do_resume(&self) -> Result<()> {
1181        Ok(())
1182    }
1183    async fn do_hangup(
1184        &self,
1185        reason: Option<CallRecordHangupReason>,
1186        initiator: Option<String>,
1187        headers: Option<HashMap<String, String>>,
1188    ) -> Result<()> {
1189        info!(
1190            session_id = self.session_id,
1191            ?reason,
1192            ?initiator,
1193            ?headers,
1194            "do_hangup"
1195        );
1196
1197        // Store headers in extras if provided
1198        if let Some(headers) = headers {
1199            let h_val = serde_json::to_value(&headers).unwrap_or_default();
1200
1201            // Update ActiveCallState extras
1202            {
1203                let mut state = self.call_state.write().await;
1204                let mut extras = state.extras.take().unwrap_or_default();
1205                extras.insert("_hangup_headers".to_string(), h_val.clone());
1206                state.extras = Some(extras);
1207            }
1208
1209            // Update AppState pending_params for retrieval by PlaybookInvitationHandler (for SIP BYE)
1210            {
1211                let mut pending = self.app_state.pending_params.lock().await;
1212                let params = pending
1213                    .entry(self.session_id.clone())
1214                    .or_insert_with(std::collections::HashMap::new);
1215                params.insert("_hangup_headers".to_string(), h_val);
1216            }
1217        } else {
1218            // Check if headers are in active call state, if so, ensure they are in pending_params
1219            let state = self.call_state.read().await;
1220            if let Some(extras) = &state.extras {
1221                if let Some(h_val) = extras.get("_hangup_headers") {
1222                    let mut pending = self.app_state.pending_params.lock().await;
1223                    let params = pending
1224                        .entry(self.session_id.clone())
1225                        .or_insert_with(std::collections::HashMap::new);
1226                    params.insert("_hangup_headers".to_string(), h_val.clone());
1227                }
1228            }
1229        }
1230
1231        // Set hangup reason based on initiator and reason
1232        let hangup_reason = match initiator.as_deref() {
1233            Some("caller") => CallRecordHangupReason::ByCaller,
1234            Some("callee") => CallRecordHangupReason::ByCallee,
1235            Some("system") => CallRecordHangupReason::Autohangup,
1236            _ => reason.unwrap_or(CallRecordHangupReason::BySystem),
1237        };
1238
1239        self.media_stream
1240            .stop(Some(hangup_reason.to_string()), initiator);
1241
1242        self.call_state
1243            .write()
1244            .await
1245            .set_hangup_reason(hangup_reason);
1246        Ok(())
1247    }
1248
1249    async fn do_refer(
1250        &self,
1251        caller: String,
1252        callee: String,
1253        refer_option: Option<ReferOption>,
1254    ) -> Result<()> {
1255        self.do_interrupt(false).await.ok();
1256
1257        // Check if we should pause parent ASR
1258        let pause_parent_asr = refer_option
1259            .as_ref()
1260            .and_then(|o| o.pause_parent_asr)
1261            .unwrap_or(false);
1262
1263        // Save original ASR option for later resume
1264        let original_asr_option = if pause_parent_asr {
1265            let cs = self.call_state.read().await;
1266            cs.option.as_ref().and_then(|o| o.asr.clone())
1267        } else {
1268            None
1269        };
1270
1271        // Pause parent ASR if requested
1272        if pause_parent_asr {
1273            info!(
1274                session_id = self.session_id,
1275                "Pausing parent call ASR during refer"
1276            );
1277            self.media_stream
1278                .remove_processor::<crate::media::asr_processor::AsrProcessor>(
1279                    &self.server_side_track_id,
1280                )
1281                .await
1282                .ok();
1283        }
1284
1285        let mut moh = refer_option.as_ref().and_then(|o| o.moh.clone());
1286        if let Some(ref path) = moh {
1287            if !path.starts_with("http") && !std::path::Path::new(path).exists() {
1288                let fallback = "./config/sounds/refer_moh.wav";
1289                if std::path::Path::new(fallback).exists() {
1290                    info!(
1291                        session_id = self.session_id,
1292                        "moh {} not found, using fallback {}", path, fallback
1293                    );
1294                    moh = Some(fallback.to_string());
1295                }
1296            }
1297        }
1298        let ref_call_id = refer_option
1299            .as_ref()
1300            .and_then(|o| o.call_id.clone())
1301            .unwrap_or_else(|| format!("ref-{}-{}", rand::random::<u32>(), self.session_id));
1302
1303        let session_id = self.session_id.clone();
1304        let track_id = self.server_side_track_id.clone();
1305
1306        let recorder = {
1307            let cs = self.call_state.read().await;
1308            cs.option
1309                .as_ref()
1310                .map(|o| o.recorder.clone())
1311                .unwrap_or_default()
1312        };
1313
1314        let call_option = CallOption {
1315            caller: Some(caller),
1316            callee: Some(callee.clone()),
1317            sip: refer_option.as_ref().and_then(|o| o.sip.clone()),
1318            asr: refer_option.as_ref().and_then(|o| o.asr.clone()),
1319            denoise: refer_option.as_ref().and_then(|o| o.denoise.clone()),
1320            recorder,
1321            ..Default::default()
1322        };
1323
1324        let mut invite_option = call_option.build_invite_option()?;
1325        invite_option.call_id = Some(ref_call_id);
1326
1327        let headers = invite_option.headers.get_or_insert_with(|| Vec::new());
1328
1329        {
1330            let cs = self.call_state.read().await;
1331            if let Some(opt) = cs.option.as_ref() {
1332                if let Some(callee) = opt.callee.as_ref() {
1333                    headers.push(rsip::Header::Other(
1334                        "X-Referred-To".to_string(),
1335                        callee.clone(),
1336                    ));
1337                }
1338                if let Some(caller) = opt.caller.as_ref() {
1339                    headers.push(rsip::Header::Other(
1340                        "X-Referred-From".to_string(),
1341                        caller.clone(),
1342                    ));
1343                }
1344            }
1345        }
1346
1347        headers.push(rsip::Header::Other(
1348            "X-Referred-Id".to_string(),
1349            self.session_id.clone(),
1350        ));
1351
1352        let ssrc = rand::random::<u32>();
1353        let refer_call_state = Arc::new(RwLock::new(ActiveCallState {
1354            start_time: Utc::now(),
1355            ssrc,
1356            option: Some(call_option.clone()),
1357            is_refer: true,
1358            ..Default::default()
1359        }));
1360
1361        {
1362            let mut cs = self.call_state.write().await;
1363            cs.refer_callstate.replace(refer_call_state.clone());
1364        }
1365
1366        let auto_hangup_requested = refer_option
1367            .as_ref()
1368            .and_then(|o| o.auto_hangup)
1369            .unwrap_or(true);
1370
1371        if auto_hangup_requested {
1372            self.call_state.write().await.auto_hangup =
1373                Some((ssrc, CallRecordHangupReason::ByRefer));
1374        } else {
1375            self.call_state.write().await.auto_hangup = None;
1376        }
1377
1378        // Setup ASR resume after refer ends (if not auto_hangup and ASR was paused)
1379        if !auto_hangup_requested && pause_parent_asr && original_asr_option.is_some() {
1380            let asr_option = original_asr_option.unwrap();
1381            self.call_state.write().await.pending_asr_resume = Some((ssrc, asr_option));
1382        }
1383
1384        let timeout_secs = refer_option.as_ref().and_then(|o| o.timeout).unwrap_or(30);
1385
1386        info!(
1387            session_id = self.session_id,
1388            ssrc,
1389            auto_hangup = auto_hangup_requested,
1390            callee,
1391            timeout_secs,
1392            "do_refer"
1393        );
1394
1395        let r = tokio::time::timeout(
1396            Duration::from_secs(timeout_secs as u64),
1397            self.create_outgoing_sip_track(
1398                self.cancel_token.child_token(),
1399                refer_call_state.clone(),
1400                &track_id,
1401                invite_option,
1402                &call_option,
1403                moh,
1404                auto_hangup_requested,
1405            ),
1406        )
1407        .await;
1408
1409        {
1410            self.call_state.write().await.moh = None;
1411        }
1412
1413        let result = match r {
1414            Ok(res) => res,
1415            Err(_) => {
1416                warn!(
1417                    session_id = session_id,
1418                    "refer sip track creation timed out after {} seconds", timeout_secs
1419                );
1420                self.event_sender
1421                    .send(SessionEvent::Reject {
1422                        track_id,
1423                        timestamp: crate::media::get_timestamp(),
1424                        reason: "Timeout when refer".into(),
1425                        code: Some(408),
1426                        refer: Some(true),
1427                    })
1428                    .ok();
1429                return Err(anyhow::anyhow!("refer sip track creation timed out").into());
1430            }
1431        };
1432
1433        match result {
1434            Ok(answer) => {
1435                self.event_sender
1436                    .send(SessionEvent::Answer {
1437                        timestamp: crate::media::get_timestamp(),
1438                        track_id,
1439                        sdp: answer,
1440                        refer: Some(true),
1441                    })
1442                    .ok();
1443            }
1444            Err(e) => {
1445                warn!(
1446                    session_id = session_id,
1447                    "failed to create refer sip track: {}", e
1448                );
1449                match &e {
1450                    rsipstack::Error::DialogError(reason, _, code) => {
1451                        self.event_sender
1452                            .send(SessionEvent::Reject {
1453                                track_id,
1454                                timestamp: crate::media::get_timestamp(),
1455                                reason: reason.clone(),
1456                                code: Some(code.code() as u32),
1457                                refer: Some(true),
1458                            })
1459                            .ok();
1460                    }
1461                    _ => {}
1462                }
1463                return Err(e.into());
1464            }
1465        }
1466        Ok(())
1467    }
1468
1469    async fn do_mute(&self, track_id: Option<String>) -> Result<()> {
1470        self.media_stream.mute_track(track_id).await;
1471        Ok(())
1472    }
1473
1474    async fn do_unmute(&self, track_id: Option<String>) -> Result<()> {
1475        self.media_stream.unmute_track(track_id).await;
1476        Ok(())
1477    }
1478
1479    pub async fn cleanup(&self) -> Result<()> {
1480        self.call_state.write().await.tts_handle = None;
1481        self.media_stream.cleanup().await.ok();
1482        Ok(())
1483    }
1484
1485    pub fn get_callrecord(&self) -> Option<CallRecord> {
1486        self.call_state.try_read().ok().map(|call_state| {
1487            call_state.build_callrecord(
1488                self.app_state.clone(),
1489                self.session_id.clone(),
1490                self.call_type.clone(),
1491            )
1492        })
1493    }
1494
1495    async fn dump_to_file(
1496        &self,
1497        dump_file: &mut File,
1498        cmd_receiver: &mut CommandReceiver,
1499        event_receiver: &mut EventReceiver,
1500    ) {
1501        loop {
1502            select! {
1503                _ = self.cancel_token.cancelled() => {
1504                    break;
1505                }
1506                Ok(cmd) = cmd_receiver.recv() => {
1507                    CallRecordEvent::write(CallRecordEventType::Command, cmd, dump_file)
1508                        .await;
1509                }
1510                Ok(event) = event_receiver.recv() => {
1511                    if matches!(event, SessionEvent::Binary{..}) {
1512                        continue;
1513                    }
1514                    CallRecordEvent::write(CallRecordEventType::Event, event, dump_file)
1515                        .await;
1516                }
1517            };
1518        }
1519    }
1520
1521    async fn dump_loop(
1522        &self,
1523        dump_events: bool,
1524        mut dump_cmd_receiver: CommandReceiver,
1525        mut dump_event_receiver: EventReceiver,
1526    ) {
1527        if !dump_events {
1528            return;
1529        }
1530
1531        let file_name = self.app_state.get_dump_events_file(&self.session_id);
1532        let mut dump_file = match File::options()
1533            .create(true)
1534            .append(true)
1535            .open(&file_name)
1536            .await
1537        {
1538            Ok(file) => file,
1539            Err(e) => {
1540                warn!(
1541                    session_id = self.session_id,
1542                    file_name, "failed to open dump events file: {}", e
1543                );
1544                return;
1545            }
1546        };
1547        self.dump_to_file(
1548            &mut dump_file,
1549            &mut dump_cmd_receiver,
1550            &mut dump_event_receiver,
1551        )
1552        .await;
1553
1554        while let Ok(event) = dump_event_receiver.try_recv() {
1555            if matches!(event, SessionEvent::Binary { .. }) {
1556                continue;
1557            }
1558            CallRecordEvent::write(CallRecordEventType::Event, event, &mut dump_file).await;
1559        }
1560    }
1561
1562    pub async fn create_rtp_track(
1563        &self,
1564        track_id: TrackId,
1565        ssrc: u32,
1566        enable_srtp: Option<bool>,
1567    ) -> Result<RtcTrack> {
1568        let mut rtc_config = RtcTrackConfig::default();
1569        // Per-call flag takes precedence over global config.
1570        let use_srtp = enable_srtp
1571            .or(self.app_state.config.enable_srtp)
1572            .unwrap_or(false);
1573        rtc_config.mode = if use_srtp {
1574            rustrtc::TransportMode::Srtp
1575        } else {
1576            rustrtc::TransportMode::Rtp
1577        };
1578
1579        if let Some(codecs) = &self.app_state.config.codecs {
1580            let mut codec_types = Vec::new();
1581            for c in codecs {
1582                match c.to_lowercase().as_str() {
1583                    "pcmu" => codec_types.push(CodecType::PCMU),
1584                    "pcma" => codec_types.push(CodecType::PCMA),
1585                    "g722" => codec_types.push(CodecType::G722),
1586                    "g729" => codec_types.push(CodecType::G729),
1587                    #[cfg(feature = "opus")]
1588                    "opus" => codec_types.push(CodecType::Opus),
1589                    "dtmf" | "2833" | "telephone_event" => {
1590                        codec_types.push(CodecType::TelephoneEvent)
1591                    }
1592                    _ => {}
1593                }
1594            }
1595            if !codec_types.is_empty() {
1596                rtc_config.preferred_codec = Some(codec_types[0].clone());
1597                rtc_config.codecs = codec_types;
1598            }
1599        }
1600
1601        if rtc_config.preferred_codec.is_none() {
1602            rtc_config.preferred_codec = Some(self.track_config.codec.clone());
1603        }
1604
1605        rtc_config.rtp_port_range = self
1606            .app_state
1607            .config
1608            .rtp_start_port
1609            .zip(self.app_state.config.rtp_end_port);
1610
1611        if let Some(ref external_ip) = self.app_state.config.external_ip {
1612            rtc_config.external_ip = Some(external_ip.clone());
1613        }
1614        if let Some(ref bind_ip) = self.app_state.config.rtp_bind_ip {
1615            rtc_config.bind_ip = Some(bind_ip.clone());
1616        }
1617
1618        rtc_config.enable_latching = self.app_state.config.enable_rtp_latching;
1619        rtc_config.enable_ice_lite = self.app_state.config.enable_ice_lite;
1620
1621        let mut track = RtcTrack::new(
1622            self.cancel_token.child_token(),
1623            track_id,
1624            self.track_config.clone(),
1625            rtc_config,
1626        )
1627        .with_ssrc(ssrc);
1628
1629        track.create().await?;
1630
1631        Ok(track)
1632    }
1633
1634    async fn setup_caller_track(&self, option: &CallOption) -> Result<()> {
1635        let hangup_headers = option
1636            .sip
1637            .as_ref()
1638            .and_then(|s| s.hangup_headers.as_ref())
1639            .map(|headers_map| {
1640                headers_map
1641                    .iter()
1642                    .map(|(k, v)| rsip::Header::Other(k.clone(), v.clone()))
1643                    .collect::<Vec<rsip::Header>>()
1644            });
1645        self.call_state.write().await.option = Some(option.clone());
1646        info!(
1647            session_id = self.session_id,
1648            call_type = ?self.call_type,
1649            "setup caller track"
1650        );
1651
1652        let track = match self.call_type {
1653            ActiveCallType::Webrtc => Some(self.create_webrtc_track().await?),
1654            ActiveCallType::WebSocket => {
1655                let audio_receiver = self.call_state.write().await.audio_receiver.take();
1656                if let Some(receiver) = audio_receiver {
1657                    Some(self.create_websocket_track(receiver).await?)
1658                } else {
1659                    None
1660                }
1661            }
1662            ActiveCallType::Sip => {
1663                if let Some(dialog_id) = self
1664                    .invitation
1665                    .find_dialog_id_by_session_id(&self.session_id)
1666                {
1667                    if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1668                        return self
1669                            .prepare_incoming_sip_track(
1670                                self.cancel_token.clone(),
1671                                self.call_state.clone(),
1672                                &self.session_id,
1673                                pending_dialog,
1674                                hangup_headers,
1675                            )
1676                            .await;
1677                    }
1678                }
1679
1680                // Auto-inject credentials from registered users if not already provided
1681                let mut option = option.clone();
1682                if option.sip.is_none()
1683                    || option
1684                        .sip
1685                        .as_ref()
1686                        .and_then(|s| s.username.as_ref())
1687                        .is_none()
1688                {
1689                    if let Some(callee) = &option.callee {
1690                        if let Some(cred) = self.app_state.find_credentials_for_callee(callee) {
1691                            if option.sip.is_none() {
1692                                option.sip = Some(crate::SipOption {
1693                                    username: Some(cred.username.clone()),
1694                                    password: Some(cred.password.clone()),
1695                                    realm: cred.realm.clone(),
1696                                    ..Default::default()
1697                                });
1698                            }
1699                        }
1700                    }
1701                }
1702
1703                let mut invite_option = option.build_invite_option()?;
1704                invite_option.call_id = Some(self.session_id.clone());
1705
1706                match self
1707                    .create_outgoing_sip_track(
1708                        self.cancel_token.clone(),
1709                        self.call_state.clone(),
1710                        &self.session_id,
1711                        invite_option,
1712                        &option,
1713                        None,
1714                        false,
1715                    )
1716                    .await
1717                {
1718                    Ok(answer) => {
1719                        self.event_sender
1720                            .send(SessionEvent::Answer {
1721                                timestamp: crate::media::get_timestamp(),
1722                                track_id: self.session_id.clone(),
1723                                sdp: answer,
1724                                refer: Some(false),
1725                            })
1726                            .ok();
1727                        return Ok(());
1728                    }
1729                    Err(e) => {
1730                        warn!(
1731                            session_id = self.session_id,
1732                            "failed to create sip track: {}", e
1733                        );
1734                        match &e {
1735                            rsipstack::Error::DialogError(reason, _, code) => {
1736                                self.event_sender
1737                                    .send(SessionEvent::Reject {
1738                                        track_id: self.session_id.clone(),
1739                                        timestamp: crate::media::get_timestamp(),
1740                                        reason: reason.clone(),
1741                                        code: Some(code.code() as u32),
1742                                        refer: Some(false),
1743                                    })
1744                                    .ok();
1745                            }
1746                            _ => {}
1747                        }
1748                        return Err(e.into());
1749                    }
1750                }
1751            }
1752            ActiveCallType::B2bua => {
1753                if let Some(dialog_id) = self
1754                    .invitation
1755                    .find_dialog_id_by_session_id(&self.session_id)
1756                {
1757                    if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1758                        return self
1759                            .prepare_incoming_sip_track(
1760                                self.cancel_token.clone(),
1761                                self.call_state.clone(),
1762                                &self.session_id,
1763                                pending_dialog,
1764                                hangup_headers,
1765                            )
1766                            .await;
1767                    }
1768                }
1769
1770                warn!(
1771                    session_id = self.session_id,
1772                    "no pending dialog found for B2BUA call"
1773                );
1774                return Err(anyhow::anyhow!(
1775                    "no pending dialog found for session_id: {}",
1776                    self.session_id
1777                ));
1778            }
1779        };
1780        match track {
1781            Some(track) => {
1782                self.finish_caller_stack(&option, Some(track)).await?;
1783            }
1784            None => {
1785                warn!(session_id = self.session_id, "no track created for caller");
1786                return Err(anyhow::anyhow!("no track created for caller"));
1787            }
1788        }
1789        Ok(())
1790    }
1791
1792    async fn finish_caller_stack(
1793        &self,
1794        option: &CallOption,
1795        track: Option<Box<dyn Track>>,
1796    ) -> Result<()> {
1797        if let Some(track) = track {
1798            self.setup_track_with_stream(&option, track).await?;
1799        }
1800
1801        {
1802            let call_state = self.call_state.read().await;
1803            if let Some(ref answer) = call_state.answer {
1804                info!(
1805                    session_id = self.session_id,
1806                    "sending answer event: {}", answer,
1807                );
1808                self.event_sender
1809                    .send(SessionEvent::Answer {
1810                        timestamp: crate::media::get_timestamp(),
1811                        track_id: self.session_id.clone(),
1812                        sdp: answer.clone(),
1813                        refer: Some(false),
1814                    })
1815                    .ok();
1816            } else {
1817                warn!(
1818                    session_id = self.session_id,
1819                    "no answer in state to send event"
1820                );
1821            }
1822        }
1823        Ok(())
1824    }
1825
1826    pub async fn setup_track_with_stream(
1827        &self,
1828        option: &CallOption,
1829        mut track: Box<dyn Track>,
1830    ) -> Result<()> {
1831        let processors = match StreamEngine::create_processors(
1832            self.app_state.stream_engine.clone(),
1833            track.as_ref(),
1834            self.cancel_token.child_token(),
1835            self.event_sender.clone(),
1836            self.media_stream.packet_sender.clone(),
1837            option,
1838        )
1839        .await
1840        {
1841            Ok(processors) => processors,
1842            Err(e) => {
1843                warn!(
1844                    session_id = self.session_id,
1845                    "failed to prepare stream processors: {}", e
1846                );
1847                vec![]
1848            }
1849        };
1850
1851        // Add all processors from the hook
1852        for processor in processors {
1853            track.append_processor(processor);
1854        }
1855
1856        self.update_track_wrapper(track, None).await;
1857        Ok(())
1858    }
1859
1860    pub async fn update_track_wrapper(&self, mut track: Box<dyn Track>, play_id: Option<String>) {
1861        let (ambiance_opt, subscribe) = {
1862            let state = self.call_state.read().await;
1863            let mut opt = state
1864                .option
1865                .as_ref()
1866                .and_then(|o| o.ambiance.clone())
1867                .unwrap_or_default();
1868
1869            if let Some(global) = &self.app_state.config.ambiance {
1870                opt.merge(global);
1871            }
1872
1873            let subscribe = state
1874                .option
1875                .as_ref()
1876                .and_then(|o| o.subscribe)
1877                .unwrap_or_default();
1878
1879            (opt, subscribe)
1880        };
1881        if track.id() == &self.server_side_track_id && ambiance_opt.path.is_some() {
1882            match AmbianceProcessor::new(ambiance_opt).await {
1883                Ok(ambiance) => {
1884                    info!(session_id = self.session_id, "loaded ambiance processor");
1885                    track.append_processor(Box::new(ambiance));
1886                }
1887                Err(e) => {
1888                    tracing::error!("failed to load ambiance wav {}", e);
1889                }
1890            }
1891        }
1892
1893        if subscribe && self.call_type != ActiveCallType::WebSocket {
1894            let (track_index, sub_track_id) = if track.id() == &self.server_side_track_id {
1895                (0, self.server_side_track_id.clone())
1896            } else {
1897                (1, self.session_id.clone())
1898            };
1899            let sub_processor =
1900                SubscribeProcessor::new(self.event_sender.clone(), sub_track_id, track_index);
1901            track.append_processor(Box::new(sub_processor));
1902        }
1903
1904        self.call_state.write().await.current_play_id = play_id.clone();
1905        self.media_stream.update_track(track, play_id).await;
1906    }
1907
1908    pub async fn create_websocket_track(
1909        &self,
1910        audio_receiver: WebsocketBytesReceiver,
1911    ) -> Result<Box<dyn Track>> {
1912        let (ssrc, codec) = {
1913            let call_state = self.call_state.read().await;
1914            (
1915                call_state.ssrc,
1916                call_state
1917                    .option
1918                    .as_ref()
1919                    .map(|o| o.codec.clone())
1920                    .unwrap_or_default(),
1921            )
1922        };
1923
1924        let ws_track = WebsocketTrack::new(
1925            self.cancel_token.child_token(),
1926            self.session_id.clone(),
1927            self.track_config.clone(),
1928            self.event_sender.clone(),
1929            audio_receiver,
1930            codec,
1931            ssrc,
1932        );
1933
1934        {
1935            let mut call_state = self.call_state.write().await;
1936            call_state.answer_time = Some(Utc::now());
1937            call_state.answer = Some("".to_string());
1938            call_state.last_status_code = 200;
1939        }
1940
1941        Ok(Box::new(ws_track))
1942    }
1943
1944    pub(super) async fn create_webrtc_track(&self) -> Result<Box<dyn Track>> {
1945        let (ssrc, option) = {
1946            let call_state = self.call_state.read().await;
1947            (
1948                call_state.ssrc,
1949                call_state.option.clone().unwrap_or_default(),
1950            )
1951        };
1952
1953        let mut rtc_config = RtcTrackConfig::default();
1954        rtc_config.mode = rustrtc::TransportMode::WebRtc; // WebRTC
1955        rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
1956
1957        if let Some(codecs) = &self.app_state.config.codecs {
1958            let mut codec_types = Vec::new();
1959            for c in codecs {
1960                match c.to_lowercase().as_str() {
1961                    "pcmu" => codec_types.push(CodecType::PCMU),
1962                    "pcma" => codec_types.push(CodecType::PCMA),
1963                    "g722" => codec_types.push(CodecType::G722),
1964                    "g729" => codec_types.push(CodecType::G729),
1965                    #[cfg(feature = "opus")]
1966                    "opus" => codec_types.push(CodecType::Opus),
1967                    "dtmf" | "2833" | "telephone_event" => {
1968                        codec_types.push(CodecType::TelephoneEvent)
1969                    }
1970                    _ => {}
1971                }
1972            }
1973            if !codec_types.is_empty() {
1974                rtc_config.preferred_codec = Some(codec_types[0].clone());
1975                rtc_config.codecs = codec_types;
1976            }
1977        }
1978
1979        if let Some(ref external_ip) = self.app_state.config.external_ip {
1980            rtc_config.external_ip = Some(external_ip.clone());
1981        }
1982        if let Some(ref bind_ip) = self.app_state.config.rtp_bind_ip {
1983            rtc_config.bind_ip = Some(bind_ip.clone());
1984        }
1985
1986        let mut webrtc_track = RtcTrack::new(
1987            self.cancel_token.child_token(),
1988            self.session_id.clone(),
1989            self.track_config.clone(),
1990            rtc_config,
1991        )
1992        .with_ssrc(ssrc);
1993
1994        let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
1995        let offer = match option.enable_ipv6 {
1996            Some(false) | None => {
1997                strip_ipv6_candidates(option.offer.as_ref().unwrap_or(&"".to_string()))
1998            }
1999            _ => option.offer.clone().unwrap_or("".to_string()),
2000        };
2001        let answer: Option<String>;
2002        match webrtc_track.handshake(offer, timeout).await {
2003            Ok(answer_sdp) => {
2004                answer = match option.enable_ipv6 {
2005                    Some(false) | None => Some(strip_ipv6_candidates(&answer_sdp)),
2006                    Some(true) => Some(answer_sdp.to_string()),
2007                };
2008            }
2009            Err(e) => {
2010                warn!(session_id = self.session_id, "failed to setup track: {}", e);
2011                return Err(anyhow::anyhow!("Failed to setup track: {}", e));
2012            }
2013        }
2014
2015        {
2016            let mut call_state = self.call_state.write().await;
2017            call_state.answer_time = Some(Utc::now());
2018            call_state.answer = answer;
2019            call_state.last_status_code = 200;
2020        }
2021        Ok(Box::new(webrtc_track))
2022    }
2023
2024    async fn create_outgoing_sip_track(
2025        &self,
2026        cancel_token: CancellationToken,
2027        call_state_ref: ActiveCallStateRef,
2028        track_id: &String,
2029        mut invite_option: InviteOption,
2030        call_option: &CallOption,
2031        moh: Option<String>,
2032        auto_hangup: bool,
2033    ) -> Result<String, rsipstack::Error> {
2034        let ssrc = call_state_ref.read().await.ssrc;
2035        let per_call_srtp = call_option.sip.as_ref().and_then(|s| s.enable_srtp);
2036        let rtp_track = self
2037            .create_rtp_track(track_id.clone(), ssrc, per_call_srtp)
2038            .await
2039            .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
2040
2041        let offer = Some(
2042            rtp_track
2043                .local_description()
2044                .await
2045                .map_err(|e| rsipstack::Error::Error(e.to_string()))?,
2046        );
2047
2048        {
2049            let mut cs = call_state_ref.write().await;
2050            if let Some(o) = cs.option.as_mut() {
2051                o.offer = offer.clone();
2052            }
2053            cs.start_time = Utc::now();
2054        };
2055
2056        invite_option.offer = offer.clone().map(|s| s.into());
2057
2058        // Set contact to local SIP endpoint address if not already set explicitly
2059        // Check if contact is still default (no scheme set) or if host is localhost-like
2060        let needs_contact = invite_option.contact.scheme.is_none()
2061            || invite_option
2062                .contact
2063                .host_with_port
2064                .to_string()
2065                .starts_with("127.0.0.1");
2066
2067        if needs_contact {
2068            if let Some(addr) = self.invitation.dialog_layer.endpoint.get_addrs().first() {
2069                invite_option.contact = rsip::Uri {
2070                    scheme: Some(rsip::Scheme::Sip),
2071                    auth: None,
2072                    host_with_port: addr.addr.clone(),
2073                    params: vec![],
2074                    headers: vec![],
2075                };
2076            }
2077        }
2078
2079        let mut rtp_track_to_setup = Some(Box::new(rtp_track) as Box<dyn Track>);
2080
2081        if let Some(moh) = moh {
2082            let ssrc_and_moh = {
2083                let mut state = call_state_ref.write().await;
2084                state.moh = Some(moh.clone());
2085                if state.current_play_id.is_none() {
2086                    let ssrc = rand::random::<u32>();
2087                    Some((ssrc, moh.clone()))
2088                } else {
2089                    info!(
2090                        session_id = self.session_id,
2091                        "Something is playing, MOH will start after it ends"
2092                    );
2093                    None
2094                }
2095            };
2096
2097            if let Some((ssrc, moh_path)) = ssrc_and_moh {
2098                let file_track = FileTrack::new(self.server_side_track_id.clone())
2099                    .with_play_id(Some(moh_path.clone()))
2100                    .with_ssrc(ssrc)
2101                    .with_path(moh_path.clone())
2102                    .with_cancel_token(self.cancel_token.child_token());
2103                self.update_track_wrapper(Box::new(file_track), Some(moh_path))
2104                    .await;
2105            }
2106        } else {
2107            let track = rtp_track_to_setup.take().unwrap();
2108            self.setup_track_with_stream(&call_option, track)
2109                .await
2110                .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
2111        }
2112
2113        info!(
2114            session_id = self.session_id,
2115            track_id,
2116            contact = %invite_option.contact,
2117            "invite {} -> {} offer: \n{}",
2118            invite_option.caller,
2119            invite_option.callee,
2120            offer.as_ref().map(|s| s.as_str()).unwrap_or("<NO OFFER>")
2121        );
2122
2123        let (dlg_state_sender, dlg_state_receiver) =
2124            self.invitation.dialog_layer.new_dialog_state_channel();
2125
2126        let states = InviteDialogStates {
2127            is_client: true,
2128            session_id: self.session_id.clone(),
2129            track_id: track_id.clone(),
2130            event_sender: self.event_sender.clone(),
2131            media_stream: self.media_stream.clone(),
2132            call_state: call_state_ref.clone(),
2133            cancel_token,
2134            terminated_reason: None,
2135            has_early_media: false,
2136        };
2137
2138        let hangup_headers = call_option
2139            .sip
2140            .as_ref()
2141            .and_then(|s| s.hangup_headers.as_ref())
2142            .map(|headers_map| {
2143                headers_map
2144                    .iter()
2145                    .map(|(k, v)| rsip::Header::Other(k.clone(), v.clone()))
2146                    .collect::<Vec<rsip::Header>>()
2147            });
2148
2149        let mut client_dialog_handler = DialogStateReceiverGuard::new(
2150            self.invitation.dialog_layer.clone(),
2151            dlg_state_receiver,
2152            hangup_headers,
2153        );
2154
2155        crate::spawn(async move {
2156            client_dialog_handler.process_dialog(states).await;
2157        });
2158
2159        let (dialog_id, answer) = self
2160            .invitation
2161            .invite(invite_option, dlg_state_sender)
2162            .await?;
2163
2164        self.call_state.write().await.moh = None;
2165
2166        if let Some(track) = rtp_track_to_setup {
2167            info!(
2168                session_id = self.session_id,
2169                track_id, "Stopping MOH and setting up RTP track"
2170            );
2171            self.media_stream
2172                .remove_track(&self.server_side_track_id, false)
2173                .await;
2174
2175            self.setup_track_with_stream(&call_option, track)
2176                .await
2177                .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
2178        }
2179
2180        let answer = match answer {
2181            Some(answer) => {
2182                let s = String::from_utf8_lossy(&answer).to_string();
2183                if s.trim().is_empty() {
2184                    // 200 OK had no body — this is valid per RFC 3261 when the answer was
2185                    // already negotiated in a 183 Session Progress (early media).
2186                    // Fall back to the early SDP stored by the Early handler.
2187                    let cs = call_state_ref.read().await;
2188                    match cs.answer.clone() {
2189                        Some(early_sdp) if !early_sdp.is_empty() => {
2190                            info!(
2191                                session_id = self.session_id,
2192                                "200 OK has empty body; using early-media SDP from 183"
2193                            );
2194                            (early_sdp, true /* already applied */)
2195                        }
2196                        _ => {
2197                            warn!(
2198                                session_id = self.session_id,
2199                                "200 OK has empty body and no early-media SDP available"
2200                            );
2201                            (s, false)
2202                        }
2203                    }
2204                } else {
2205                    (s, false)
2206                }
2207            }
2208            None => {
2209                // No answer body at all — check if early media SDP is available before failing
2210                let cs = call_state_ref.read().await;
2211                match cs.answer.clone() {
2212                    Some(early_sdp) if !early_sdp.is_empty() => {
2213                        info!(
2214                            session_id = self.session_id,
2215                            "200 OK had no answer; using early-media SDP from 183"
2216                        );
2217                        (early_sdp, true /* already applied */)
2218                    }
2219                    _ => {
2220                        warn!(session_id = self.session_id, "no answer received");
2221                        return Err(rsipstack::Error::DialogError(
2222                            "No answer received".to_string(),
2223                            dialog_id,
2224                            rsip::StatusCode::NotAcceptableHere,
2225                        ));
2226                    }
2227                }
2228            }
2229        };
2230        let (answer, remote_description_already_applied) = answer;
2231
2232        {
2233            let mut cs = call_state_ref.write().await;
2234            if cs.answer.is_none() {
2235                cs.answer = Some(answer.clone());
2236            }
2237            if auto_hangup {
2238                cs.auto_hangup = Some((ssrc, CallRecordHangupReason::ByRefer));
2239            }
2240        }
2241        if !remote_description_already_applied {
2242            self.media_stream
2243                .update_remote_description(&track_id, &answer)
2244                .await
2245                .ok();
2246        }
2247
2248        Ok(answer)
2249    }
2250
2251    /// Detect if SDP is WebRTC format
2252    pub fn is_webrtc_sdp(sdp: &str) -> bool {
2253        (sdp.contains("a=ice-ufrag:") || sdp.contains("a=ice-pwd:"))
2254            && sdp.contains("a=fingerprint:")
2255    }
2256
2257    pub async fn setup_answer_track(
2258        &self,
2259        ssrc: u32,
2260        option: &CallOption,
2261        offer: String,
2262    ) -> Result<(String, Box<dyn Track>)> {
2263        let offer = match option.enable_ipv6 {
2264            Some(false) | None => strip_ipv6_candidates(&offer),
2265            _ => offer.clone(),
2266        };
2267
2268        let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
2269
2270        let mut media_track = if Self::is_webrtc_sdp(&offer) {
2271            let mut rtc_config = RtcTrackConfig::default();
2272            rtc_config.mode = rustrtc::TransportMode::WebRtc;
2273            rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
2274            if let Some(ref external_ip) = self.app_state.config.external_ip {
2275                rtc_config.external_ip = Some(external_ip.clone());
2276            }
2277            if let Some(ref bind_ip) = self.app_state.config.rtp_bind_ip {
2278                rtc_config.bind_ip = Some(bind_ip.clone());
2279            }
2280            rtc_config.enable_latching = self.app_state.config.enable_rtp_latching;
2281            rtc_config.enable_ice_lite = self.app_state.config.enable_ice_lite;
2282
2283            let webrtc_track = RtcTrack::new(
2284                self.cancel_token.child_token(),
2285                self.session_id.clone(),
2286                self.track_config.clone(),
2287                rtc_config,
2288            )
2289            .with_ssrc(ssrc);
2290
2291            Box::new(webrtc_track) as Box<dyn Track>
2292        } else {
2293            let per_call_srtp = option.sip.as_ref().and_then(|s| s.enable_srtp);
2294            let rtp_track = self
2295                .create_rtp_track(self.session_id.clone(), ssrc, per_call_srtp)
2296                .await?;
2297            Box::new(rtp_track) as Box<dyn Track>
2298        };
2299
2300        let answer = match media_track.handshake(offer.clone(), timeout).await {
2301            Ok(answer) => answer,
2302            Err(e) => {
2303                return Err(anyhow::anyhow!("handshake failed: {e}"));
2304            }
2305        };
2306
2307        return Ok((answer, media_track));
2308    }
2309
2310    pub async fn prepare_incoming_sip_track(
2311        &self,
2312        cancel_token: CancellationToken,
2313        call_state_ref: ActiveCallStateRef,
2314        track_id: &String,
2315        pending_dialog: PendingDialog,
2316        hangup_headers: Option<Vec<rsip::Header>>,
2317    ) -> Result<()> {
2318        let state_receiver = pending_dialog.state_receiver;
2319        //let pending_token_clone = pending_dialog.token;
2320
2321        let states = InviteDialogStates {
2322            is_client: false,
2323            session_id: self.session_id.clone(),
2324            track_id: track_id.clone(),
2325            event_sender: self.event_sender.clone(),
2326            media_stream: self.media_stream.clone(),
2327            call_state: self.call_state.clone(),
2328            cancel_token,
2329            terminated_reason: None,
2330            has_early_media: false,
2331        };
2332
2333        let initial_request = pending_dialog.dialog.initial_request();
2334        let offer = String::from_utf8_lossy(&initial_request.body).to_string();
2335
2336        let (ssrc, option) = {
2337            let call_state = call_state_ref.read().await;
2338            (
2339                call_state.ssrc,
2340                call_state.option.clone().unwrap_or_default(),
2341            )
2342        };
2343
2344        match self.setup_answer_track(ssrc, &option, offer).await {
2345            Ok((offer, track)) => {
2346                self.setup_track_with_stream(&option, track).await?;
2347                {
2348                    let mut state = self.call_state.write().await;
2349                    state.ready_to_answer = Some((offer, None, pending_dialog.dialog));
2350                }
2351            }
2352            Err(e) => {
2353                return Err(anyhow::anyhow!("error creating track: {}", e));
2354            }
2355        }
2356
2357        let mut client_dialog_handler = DialogStateReceiverGuard::new(
2358            self.invitation.dialog_layer.clone(),
2359            state_receiver,
2360            hangup_headers,
2361        );
2362
2363        crate::spawn(async move {
2364            client_dialog_handler.process_dialog(states).await;
2365        });
2366        Ok(())
2367    }
2368}
2369
2370impl Drop for ActiveCall {
2371    fn drop(&mut self) {
2372        info!(session_id = self.session_id, "dropping active call");
2373        if let Some(sender) = self.app_state.callrecord_sender.as_ref() {
2374            if let Some(record) = self.get_callrecord() {
2375                if let Err(e) = sender.send(record) {
2376                    warn!(
2377                        session_id = self.session_id,
2378                        "failed to send call record: {}", e
2379                    );
2380                }
2381            }
2382        }
2383    }
2384}
2385
2386impl ActiveCallState {
2387    pub fn merge_option(&self, mut option: CallOption) -> CallOption {
2388        if let Some(existing) = &self.option {
2389            if option.asr.is_none() {
2390                option.asr = existing.asr.clone();
2391            }
2392            if option.tts.is_none() {
2393                option.tts = existing.tts.clone();
2394            }
2395            if option.vad.is_none() {
2396                option.vad = existing.vad.clone();
2397            }
2398            if option.denoise.is_none() {
2399                option.denoise = existing.denoise;
2400            }
2401            if option.recorder.is_none() {
2402                option.recorder = existing.recorder.clone();
2403            }
2404            if option.eou.is_none() {
2405                option.eou = existing.eou.clone();
2406            }
2407            if option.extra.is_none() {
2408                option.extra = existing.extra.clone();
2409            }
2410            if option.ambiance.is_none() {
2411                option.ambiance = existing.ambiance.clone();
2412            }
2413        }
2414        option
2415    }
2416
2417    pub fn set_hangup_reason(&mut self, reason: CallRecordHangupReason) {
2418        if self.hangup_reason.is_none() {
2419            self.hangup_reason = Some(reason);
2420        }
2421    }
2422
2423    pub fn build_hangup_event(
2424        &self,
2425        track_id: TrackId,
2426        initiator: Option<String>,
2427    ) -> crate::event::SessionEvent {
2428        let from = self.option.as_ref().and_then(|o| o.caller.as_ref());
2429        let to = self.option.as_ref().and_then(|o| o.callee.as_ref());
2430        let extra = self.extras.clone();
2431
2432        crate::event::SessionEvent::Hangup {
2433            track_id,
2434            timestamp: crate::media::get_timestamp(),
2435            reason: Some(format!("{:?}", self.hangup_reason)),
2436            initiator,
2437            start_time: self.start_time.to_rfc3339(),
2438            answer_time: self.answer_time.map(|t| t.to_rfc3339()),
2439            ringing_time: self.ring_time.map(|t| t.to_rfc3339()),
2440            hangup_time: Utc::now().to_rfc3339(),
2441            extra,
2442            from: from.map(|f| f.into()),
2443            to: to.map(|f| f.into()),
2444            refer: Some(self.is_refer),
2445        }
2446    }
2447
2448    pub fn build_callrecord(
2449        &self,
2450        app_state: AppState,
2451        session_id: String,
2452        call_type: ActiveCallType,
2453    ) -> CallRecord {
2454        let option = self.option.clone().unwrap_or_default();
2455        let recorder = if option.recorder.is_some() {
2456            let recorder_file = app_state.get_recorder_file(&session_id);
2457            if std::path::Path::new(&recorder_file).exists() {
2458                let file_size = std::fs::metadata(&recorder_file)
2459                    .map(|m| m.len())
2460                    .unwrap_or(0);
2461                vec![crate::callrecord::CallRecordMedia {
2462                    track_id: session_id.clone(),
2463                    path: recorder_file,
2464                    size: file_size,
2465                    extra: None,
2466                }]
2467            } else {
2468                vec![]
2469            }
2470        } else {
2471            vec![]
2472        };
2473
2474        let dump_event_file = app_state.get_dump_events_file(&session_id);
2475        let dump_event_file = if std::path::Path::new(&dump_event_file).exists() {
2476            Some(dump_event_file)
2477        } else {
2478            None
2479        };
2480
2481        let refer_callrecord = self.refer_callstate.as_ref().and_then(|rc| {
2482            if let Ok(rc) = rc.try_read() {
2483                Some(Box::new(rc.build_callrecord(
2484                    app_state.clone(),
2485                    rc.session_id.clone(),
2486                    ActiveCallType::B2bua,
2487                )))
2488            } else {
2489                None
2490            }
2491        });
2492
2493        let caller = option.caller.clone().unwrap_or_default();
2494        let callee = option.callee.clone().unwrap_or_default();
2495
2496        CallRecord {
2497            option: Some(option),
2498            call_id: session_id,
2499            call_type,
2500            start_time: self.start_time,
2501            ring_time: self.ring_time.clone(),
2502            answer_time: self.answer_time.clone(),
2503            end_time: Utc::now(),
2504            caller,
2505            callee,
2506            hangup_reason: self.hangup_reason.clone(),
2507            hangup_messages: Vec::new(),
2508            status_code: self.last_status_code,
2509            extras: self.extras.clone(),
2510            dump_event_file,
2511            recorder,
2512            refer_callrecord,
2513        }
2514    }
2515}