active_call/call/
active_call.rs

1use super::Command;
2use crate::{
3    CallOption, ReferOption,
4    event::{EventReceiver, EventSender, SessionEvent},
5    media::{
6        TrackId,
7        ambiance::AmbianceProcessor,
8        engine::StreamEngine,
9        negotiate::strip_ipv6_candidates,
10        recorder::RecorderOption,
11        stream::{MediaStream, MediaStreamBuilder},
12        track::{
13            Track, TrackConfig,
14            file::FileTrack,
15            media_pass::MediaPassTrack,
16            rtc::{RtcTrack, RtcTrackConfig},
17            tts::SynthesisHandle,
18            websocket::{WebsocketBytesReceiver, WebsocketTrack},
19        },
20    },
21    synthesis::{SynthesisCommand, SynthesisOption},
22};
23use crate::{
24    app::AppState,
25    call::{
26        CommandReceiver, CommandSender,
27        sip::{DialogStateReceiverGuard, Invitation, InviteDialogStates},
28    },
29    callrecord::{CallRecord, CallRecordEvent, CallRecordEventType, CallRecordHangupReason},
30    useragent::invitation::PendingDialog,
31};
32use anyhow::Result;
33use audio_codec::CodecType;
34use chrono::{DateTime, Utc};
35use rsipstack::dialog::{invitation::InviteOption, server_dialog::ServerInviteDialog};
36use serde::{Deserialize, Serialize};
37use std::{collections::HashMap, path::Path, sync::Arc, time::Duration};
38use tokio::{fs::File, select, sync::Mutex, sync::RwLock, time::sleep};
39use tokio_util::sync::CancellationToken;
40use tracing::{debug, info, warn};
41
42#[cfg(test)]
43mod tests {
44    use super::*;
45    use crate::app::AppStateBuilder;
46    use crate::callrecord::CallRecordHangupReason;
47    use crate::config::Config;
48    use crate::media::track::tts::SynthesisHandle;
49    use crate::synthesis::SynthesisCommand;
50    use tokio::sync::mpsc;
51
52    #[tokio::test]
53    async fn test_tts_ssrc_reuse_for_autohangup() -> Result<()> {
54        let mut config = Config::default();
55        config.udp_port = 0; // Use random port
56        config.media_cache_path = "/tmp/mediacache".to_string();
57        let stream_engine = Arc::new(StreamEngine::default());
58        let app_state = AppStateBuilder::new()
59            .with_config(config)
60            .with_stream_engine(stream_engine)
61            .build()
62            .await?;
63
64        let cancel_token = CancellationToken::new();
65        let session_id = "test-session".to_string();
66        let track_config = TrackConfig::default();
67
68        let mut option = crate::CallOption::default();
69        option.tts = Some(crate::synthesis::SynthesisOption::default());
70
71        let active_call = Arc::new(ActiveCall::new(
72            ActiveCallType::Sip,
73            cancel_token.clone(),
74            session_id.clone(),
75            app_state.invitation.clone(),
76            app_state.clone(),
77            track_config,
78            None,
79            false,
80            None,
81            None,
82        ));
83
84        {
85            let mut state = active_call.call_state.write().await;
86            state.option = Some(option);
87        }
88
89        let (tx, mut rx) = mpsc::unbounded_channel::<SynthesisCommand>();
90        let initial_ssrc = 12345;
91        let handle = SynthesisHandle::new(tx, Some("play_1".to_string()), initial_ssrc);
92
93        // 1. Set initial TTS handle
94        {
95            let mut state = active_call.call_state.write().await;
96            state.tts_handle = Some(handle);
97            state.current_play_id = Some("play_1".to_string());
98        }
99
100        // 2. Call do_tts with auto_hangup=true and same play_id
101        active_call
102            .do_tts(
103                "hangup now".to_string(),
104                None,
105                Some("play_1".to_string()),
106                Some(true),
107                false,
108                true,
109                None,
110                None,
111                false,
112            )
113            .await?;
114
115        // 3. Verify auto_hangup state uses the EXISTING SSRC
116        {
117            let state = active_call.call_state.read().await;
118            assert!(state.auto_hangup.is_some());
119            let (h_ssrc, reason) = state.auto_hangup.clone().unwrap();
120            assert_eq!(
121                h_ssrc, initial_ssrc,
122                "SSRC should be reused from existing handle"
123            );
124            assert_eq!(reason, CallRecordHangupReason::BySystem);
125        }
126
127        // 4. Verify command was sent to existing channel
128        let cmd = rx.try_recv().expect("Should have received tts command");
129        assert_eq!(cmd.text, "hangup now");
130
131        Ok(())
132    }
133
134    #[tokio::test]
135    async fn test_tts_new_ssrc_for_different_play_id() -> Result<()> {
136        let mut config = Config::default();
137        config.udp_port = 0; // Use random port
138        config.media_cache_path = "/tmp/mediacache".to_string();
139        let stream_engine = Arc::new(StreamEngine::default());
140        let app_state = AppStateBuilder::new()
141            .with_config(config)
142            .with_stream_engine(stream_engine)
143            .build()
144            .await?;
145
146        let active_call = Arc::new(ActiveCall::new(
147            ActiveCallType::Sip,
148            CancellationToken::new(),
149            "test-session".to_string(),
150            app_state.invitation.clone(),
151            app_state.clone(),
152            TrackConfig::default(),
153            None,
154            false,
155            None,
156            None,
157        ));
158
159        let mut tts_opt = crate::synthesis::SynthesisOption::default();
160        tts_opt.provider = Some(crate::synthesis::SynthesisType::MsEdge);
161        let mut option = crate::CallOption::default();
162        option.tts = Some(tts_opt);
163        {
164            let mut state = active_call.call_state.write().await;
165            state.option = Some(option);
166        }
167
168        let (tx, _rx) = mpsc::unbounded_channel();
169        let initial_ssrc = 111;
170        let handle = SynthesisHandle::new(tx, Some("play_1".to_string()), initial_ssrc);
171
172        {
173            let mut state = active_call.call_state.write().await;
174            state.tts_handle = Some(handle);
175            state.current_play_id = Some("play_1".to_string());
176        }
177
178        // Call do_tts with DIFFERENT play_id
179        active_call
180            .do_tts(
181                "new play".to_string(),
182                None,
183                Some("play_2".to_string()),
184                Some(true),
185                false,
186                true,
187                None,
188                None,
189                false,
190            )
191            .await?;
192
193        // Verify auto_hangup uses a NEW SSRC (because it should interrupt and start fresh)
194        {
195            let state = active_call.call_state.read().await;
196            let (h_ssrc, _) = state.auto_hangup.clone().unwrap();
197            assert_ne!(
198                h_ssrc, initial_ssrc,
199                "Should use a new SSRC for different play_id"
200            );
201        }
202
203        Ok(())
204    }
205}
206
207#[derive(Deserialize)]
208#[serde(rename_all = "camelCase")]
209pub struct CallParams {
210    pub id: Option<String>,
211    #[serde(rename = "dump")]
212    pub dump_events: Option<bool>,
213    #[serde(rename = "ping")]
214    pub ping_interval: Option<u32>,
215    pub server_side_track: Option<String>,
216}
217
218#[derive(Debug, Serialize, Deserialize, Clone, Default)]
219#[serde(rename_all = "camelCase")]
220pub enum ActiveCallType {
221    Webrtc,
222    B2bua,
223    WebSocket,
224    #[default]
225    Sip,
226}
227
228#[derive(Default)]
229pub struct ActiveCallState {
230    pub session_id: String,
231    pub start_time: DateTime<Utc>,
232    pub ring_time: Option<DateTime<Utc>>,
233    pub answer_time: Option<DateTime<Utc>>,
234    pub hangup_reason: Option<CallRecordHangupReason>,
235    pub last_status_code: u16,
236    pub option: Option<CallOption>,
237    pub answer: Option<String>,
238    pub ssrc: u32,
239    pub refer_callstate: Option<ActiveCallStateRef>,
240    pub extras: Option<HashMap<String, serde_json::Value>>,
241    pub is_refer: bool,
242
243    // Runtime state (migrated from ActiveCall to reduce multiple locks)
244    pub tts_handle: Option<SynthesisHandle>,
245    pub auto_hangup: Option<(u32, CallRecordHangupReason)>,
246    pub wait_input_timeout: Option<u32>,
247    pub moh: Option<String>,
248    pub current_play_id: Option<String>,
249    pub audio_receiver: Option<WebsocketBytesReceiver>,
250    pub ready_to_answer: Option<(String, Option<Box<dyn Track>>, ServerInviteDialog)>,
251}
252
253pub type ActiveCallRef = Arc<ActiveCall>;
254pub type ActiveCallStateRef = Arc<RwLock<ActiveCallState>>;
255
256pub struct ActiveCall {
257    pub call_state: ActiveCallStateRef,
258    pub cancel_token: CancellationToken,
259    pub call_type: ActiveCallType,
260    pub session_id: String,
261    pub media_stream: Arc<MediaStream>,
262    pub track_config: TrackConfig,
263    pub event_sender: EventSender,
264    pub app_state: AppState,
265    pub invitation: Invitation,
266    pub cmd_sender: CommandSender,
267    pub dump_events: bool,
268    pub server_side_track_id: TrackId,
269}
270
271pub struct ActiveCallGuard {
272    pub call: ActiveCallRef,
273    pub active_calls: usize,
274}
275
276impl ActiveCallGuard {
277    pub fn new(call: ActiveCallRef) -> Self {
278        let active_calls = {
279            call.app_state
280                .total_calls
281                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
282            let mut calls = call.app_state.active_calls.lock().unwrap();
283            calls.insert(call.session_id.clone(), call.clone());
284            calls.len()
285        };
286        Self { call, active_calls }
287    }
288}
289
290impl Drop for ActiveCallGuard {
291    fn drop(&mut self) {
292        self.call
293            .app_state
294            .active_calls
295            .lock()
296            .unwrap()
297            .remove(&self.call.session_id);
298    }
299}
300
301pub struct ActiveCallReceiver {
302    pub cmd_receiver: CommandReceiver,
303    pub dump_cmd_receiver: CommandReceiver,
304    pub dump_event_receiver: EventReceiver,
305}
306
307impl ActiveCall {
308    pub fn new(
309        call_type: ActiveCallType,
310        cancel_token: CancellationToken,
311        session_id: String,
312        invitation: Invitation,
313        app_state: AppState,
314        track_config: TrackConfig,
315        audio_receiver: Option<WebsocketBytesReceiver>,
316        dump_events: bool,
317        server_side_track_id: Option<TrackId>,
318        extras: Option<HashMap<String, serde_json::Value>>,
319    ) -> Self {
320        let event_sender = crate::event::create_event_sender();
321        let cmd_sender = tokio::sync::broadcast::Sender::<Command>::new(32);
322        let media_stream_builder = MediaStreamBuilder::new(event_sender.clone())
323            .with_id(session_id.clone())
324            .with_cancel_token(cancel_token.child_token());
325        let media_stream = Arc::new(media_stream_builder.build());
326        let call_state = Arc::new(RwLock::new(ActiveCallState {
327            session_id: session_id.clone(),
328            start_time: Utc::now(),
329            ssrc: rand::random::<u32>(),
330            extras,
331            audio_receiver,
332            ..Default::default()
333        }));
334        Self {
335            cancel_token,
336            call_type,
337            session_id,
338            call_state,
339            media_stream,
340            track_config,
341            event_sender,
342            app_state,
343            invitation,
344            cmd_sender,
345            dump_events,
346            server_side_track_id: server_side_track_id.unwrap_or("server-side-track".to_string()),
347        }
348    }
349
350    pub async fn enqueue_command(&self, command: Command) -> Result<()> {
351        self.cmd_sender
352            .send(command)
353            .map_err(|e| anyhow::anyhow!("Failed to send command: {}", e))?;
354        Ok(())
355    }
356
357    /// Create a new ActiveCallReceiver for this ActiveCall
358    /// `tokio::sync::broadcast` not cached messages, so need to early create receiver
359    /// before calling `serve()`
360    pub fn new_receiver(&self) -> ActiveCallReceiver {
361        ActiveCallReceiver {
362            cmd_receiver: self.cmd_sender.subscribe(),
363            dump_cmd_receiver: self.cmd_sender.subscribe(),
364            dump_event_receiver: self.event_sender.subscribe(),
365        }
366    }
367
368    pub async fn serve(&self, receiver: ActiveCallReceiver) -> Result<()> {
369        let ActiveCallReceiver {
370            mut cmd_receiver,
371            dump_cmd_receiver,
372            dump_event_receiver,
373        } = receiver;
374
375        let process_command_loop = async move {
376            while let Ok(command) = cmd_receiver.recv().await {
377                match self.dispatch(command).await {
378                    Ok(_) => (),
379                    Err(e) => {
380                        warn!(session_id = self.session_id, "{}", e);
381                        self.event_sender
382                            .send(SessionEvent::Error {
383                                track_id: self.session_id.clone(),
384                                timestamp: crate::media::get_timestamp(),
385                                sender: "command".to_string(),
386                                error: e.to_string(),
387                                code: None,
388                            })
389                            .ok();
390                    }
391                }
392            }
393        };
394        self.app_state
395            .total_calls
396            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
397
398        tokio::join!(
399            self.dump_loop(self.dump_events, dump_cmd_receiver, dump_event_receiver),
400            async {
401                select! {
402                    _ = process_command_loop => {
403                        info!(session_id = self.session_id, "command loop done");
404                    }
405                    _ = self.process() => {
406                        info!(session_id = self.session_id, "call serve done");
407                    }
408                    _ = self.cancel_token.cancelled() => {
409                        info!(session_id = self.session_id, "call cancelled - cleaning up resources");
410                    }
411                }
412                self.cancel_token.cancel();
413            }
414        );
415        Ok(())
416    }
417
418    async fn process(&self) -> Result<()> {
419        let mut event_receiver = self.event_sender.subscribe();
420
421        let input_timeout_expire = Arc::new(Mutex::new((0u64, 0u32)));
422        let input_timeout_expire_ref = input_timeout_expire.clone();
423        let event_sender = self.event_sender.clone();
424        let wait_input_timeout_loop = async {
425            loop {
426                let (start_time, expire) = { *input_timeout_expire.lock().await };
427                if expire > 0 && crate::media::get_timestamp() >= start_time + expire as u64 {
428                    info!(session_id = self.session_id, "wait input timeout reached");
429                    *input_timeout_expire.lock().await = (0, 0);
430                    event_sender
431                        .send(SessionEvent::Silence {
432                            track_id: self.server_side_track_id.clone(),
433                            timestamp: crate::media::get_timestamp(),
434                            start_time,
435                            duration: expire as u64,
436                            samples: None,
437                        })
438                        .ok();
439                }
440                sleep(Duration::from_millis(100)).await;
441            }
442        };
443        let server_side_track_id = self.server_side_track_id.clone();
444        let event_hook_loop = async move {
445            while let Ok(event) = event_receiver.recv().await {
446                match event {
447                    SessionEvent::Speaking { .. }
448                    | SessionEvent::Dtmf { .. }
449                    | SessionEvent::AsrDelta { .. }
450                    | SessionEvent::AsrFinal { .. }
451                    | SessionEvent::TrackStart { .. } => {
452                        *input_timeout_expire_ref.lock().await = (0, 0);
453                    }
454                    SessionEvent::TrackEnd {
455                        track_id,
456                        play_id,
457                        ssrc,
458                        ..
459                    } => {
460                        if track_id != server_side_track_id {
461                            continue;
462                        }
463
464                        let (moh_path, auto_hangup, wait_timeout_val) = {
465                            let mut state = self.call_state.write().await;
466                            if play_id != state.current_play_id {
467                                debug!(
468                                    session_id = self.session_id,
469                                    ?play_id,
470                                    current = ?state.current_play_id,
471                                    "ignoring interrupted track end"
472                                );
473                                continue;
474                            }
475                            state.current_play_id = None;
476                            (
477                                state.moh.clone(),
478                                state.auto_hangup.clone(),
479                                state.wait_input_timeout.take(),
480                            )
481                        };
482
483                        if let Some(path) = moh_path {
484                            info!(session_id = self.session_id, "looping moh: {}", path);
485                            let ssrc = rand::random::<u32>();
486                            let file_track = FileTrack::new(self.server_side_track_id.clone())
487                                .with_play_id(Some(path.clone()))
488                                .with_ssrc(ssrc)
489                                .with_path(path.clone())
490                                .with_cancel_token(self.cancel_token.child_token());
491                            self.update_track_wrapper(Box::new(file_track), Some(path))
492                                .await;
493                            continue;
494                        }
495
496                        if let Some((hangup_ssrc, hangup_reason)) = auto_hangup {
497                            if hangup_ssrc == ssrc {
498                                info!(
499                                    session_id = self.session_id,
500                                    ssrc, "auto hangup when track end track_id:{}", track_id
501                                );
502                                self.do_hangup(Some(hangup_reason), None).await.ok();
503                            }
504                        }
505
506                        if let Some(timeout) = wait_timeout_val {
507                            let expire = if timeout > 0 {
508                                (crate::media::get_timestamp(), timeout)
509                            } else {
510                                (0, 0)
511                            };
512                            *input_timeout_expire_ref.lock().await = expire;
513                        }
514                    }
515                    SessionEvent::Interrupt { receiver } => {
516                        let track_id =
517                            receiver.unwrap_or_else(|| self.server_side_track_id.clone());
518                        if track_id == self.server_side_track_id {
519                            debug!(
520                                session_id = self.session_id,
521                                "received interrupt event, stopping playback"
522                            );
523                            self.do_interrupt(true).await.ok();
524                        }
525                    }
526                    SessionEvent::Inactivity { track_id, .. } => {
527                        info!(
528                            session_id = self.session_id,
529                            track_id, "inactivity timeout reached, hanging up"
530                        );
531                        self.do_hangup(Some(CallRecordHangupReason::InactivityTimeout), None)
532                            .await
533                            .ok();
534                    }
535                    SessionEvent::Error { track_id, .. } => {
536                        if track_id != server_side_track_id {
537                            continue;
538                        }
539
540                        let moh_info = {
541                            let mut state = self.call_state.write().await;
542                            if let Some(path) = state.moh.clone() {
543                                let fallback = "./config/sounds/refer_moh.wav".to_string();
544                                let next_path = if path != fallback
545                                    && std::path::Path::new(&fallback).exists()
546                                {
547                                    info!(
548                                        session_id = self.session_id,
549                                        "moh error, switching to fallback: {}", fallback
550                                    );
551                                    state.moh = Some(fallback.clone());
552                                    fallback
553                                } else {
554                                    info!(
555                                        session_id = self.session_id,
556                                        "looping moh on error: {}", path
557                                    );
558                                    path
559                                };
560                                Some(next_path)
561                            } else {
562                                None
563                            }
564                        };
565
566                        if let Some(next_path) = moh_info {
567                            let ssrc = rand::random::<u32>();
568                            let file_track = FileTrack::new(self.server_side_track_id.clone())
569                                .with_play_id(Some(next_path.clone()))
570                                .with_ssrc(ssrc)
571                                .with_path(next_path.clone())
572                                .with_cancel_token(self.cancel_token.child_token());
573                            self.update_track_wrapper(Box::new(file_track), Some(next_path))
574                                .await;
575                            continue;
576                        }
577                    }
578                    _ => {}
579                }
580            }
581        };
582
583        select! {
584            _ = wait_input_timeout_loop=>{
585                info!(session_id = self.session_id, "wait input timeout loop done");
586            }
587            _ = self.media_stream.serve() => {
588                info!(session_id = self.session_id, "media stream loop done");
589            }
590            _ = event_hook_loop => {
591                info!(session_id = self.session_id, "event loop done");
592            }
593        }
594        Ok(())
595    }
596
597    async fn dispatch(&self, command: Command) -> Result<()> {
598        match command {
599            Command::Invite { option } => self.do_invite(option).await,
600            Command::Accept { option } => self.do_accept(option).await,
601            Command::Reject { reason, code } => {
602                self.do_reject(code.map(|c| (c as u16).into()), Some(reason))
603                    .await
604            }
605            Command::Ringing {
606                ringtone,
607                recorder,
608                early_media,
609            } => self.do_ringing(ringtone, recorder, early_media).await,
610            Command::Tts {
611                text,
612                speaker,
613                play_id,
614                auto_hangup,
615                streaming,
616                end_of_stream,
617                option,
618                wait_input_timeout,
619                base64,
620            } => {
621                self.do_tts(
622                    text,
623                    speaker,
624                    play_id,
625                    auto_hangup,
626                    streaming.unwrap_or_default(),
627                    end_of_stream.unwrap_or_default(),
628                    option,
629                    wait_input_timeout,
630                    base64.unwrap_or_default(),
631                )
632                .await
633            }
634            Command::Play {
635                url,
636                play_id,
637                auto_hangup,
638                wait_input_timeout,
639            } => {
640                self.do_play(url, play_id, auto_hangup, wait_input_timeout)
641                    .await
642            }
643            Command::Hangup { reason, initiator } => {
644                let reason = reason.map(|r| {
645                    r.parse::<CallRecordHangupReason>()
646                        .unwrap_or(CallRecordHangupReason::BySystem)
647                });
648                self.do_hangup(reason, initiator).await
649            }
650            Command::Refer {
651                caller,
652                callee,
653                options,
654            } => self.do_refer(caller, callee, options).await,
655            Command::Mute { track_id } => self.do_mute(track_id).await,
656            Command::Unmute { track_id } => self.do_unmute(track_id).await,
657            Command::Pause {} => self.do_pause().await,
658            Command::Resume {} => self.do_resume().await,
659            Command::Interrupt {
660                graceful: passage,
661                fade_out_ms: _,
662            } => self.do_interrupt(passage.unwrap_or_default()).await,
663            Command::History { speaker, text } => self.do_history(speaker, text).await,
664        }
665    }
666
667    fn build_record_option(&self, option: &CallOption) -> Option<RecorderOption> {
668        if let Some(recorder_option) = &option.recorder {
669            let mut recorder_file = recorder_option.recorder_file.clone();
670            if recorder_file.contains("{id}") {
671                recorder_file = recorder_file.replace("{id}", &self.session_id);
672            }
673
674            let recorder_file = if recorder_file.is_empty() {
675                self.app_state.get_recorder_file(&self.session_id)
676            } else {
677                let p = Path::new(&recorder_file);
678                p.is_absolute()
679                    .then(|| recorder_file.clone())
680                    .unwrap_or_else(|| self.app_state.get_recorder_file(&recorder_file))
681            };
682            info!(
683                session_id = self.session_id,
684                recorder_file, "created recording file"
685            );
686
687            let track_samplerate = self.track_config.samplerate;
688            let recorder_samplerate = if track_samplerate > 0 {
689                track_samplerate
690            } else {
691                recorder_option.samplerate
692            };
693            let recorder_ptime = if recorder_option.ptime == 0 {
694                200
695            } else {
696                recorder_option.ptime
697            };
698            let requested_format = recorder_option
699                .format
700                .unwrap_or(self.app_state.config.recorder_format());
701            let format = requested_format.effective();
702            if requested_format != format {
703                warn!(
704                    session_id = self.session_id,
705                    requested = requested_format.extension(),
706                    "Recorder format fallback to wav due to unsupported feature"
707                );
708            }
709            let mut recorder_config = RecorderOption {
710                recorder_file,
711                samplerate: recorder_samplerate,
712                ptime: recorder_ptime,
713                format: Some(format),
714            };
715            recorder_config.ensure_path_extension(format);
716            Some(recorder_config)
717        } else {
718            None
719        }
720    }
721
722    async fn invite_or_accept(&self, mut option: CallOption, sender: String) -> Result<CallOption> {
723        // Merge with existing configuration (e.g., from playbook)
724        {
725            let state = self.call_state.read().await;
726            option = state.merge_option(option);
727        }
728
729        option.check_default();
730        if let Some(opt) = self.build_record_option(&option) {
731            self.media_stream.update_recorder_option(opt).await;
732        }
733
734        if let Some(opt) = &option.media_pass {
735            let track_id = self.server_side_track_id.clone();
736            let cancel_token = self.cancel_token.child_token();
737            let ssrc = rand::random::<u32>();
738            let media_pass_track = MediaPassTrack::new(
739                self.session_id.clone(),
740                ssrc,
741                track_id,
742                cancel_token,
743                opt.clone(),
744            );
745            self.update_track_wrapper(Box::new(media_pass_track), None)
746                .await;
747        }
748
749        info!(
750            session_id = self.session_id,
751            call_type = ?self.call_type,
752            sender,
753            ?option,
754            "caller with option"
755        );
756
757        match self.setup_caller_track(&option).await {
758            Ok(_) => return Ok(option),
759            Err(e) => {
760                self.app_state
761                    .total_failed_calls
762                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
763                let error_event = crate::event::SessionEvent::Error {
764                    track_id: self.session_id.clone(),
765                    timestamp: crate::media::get_timestamp(),
766                    sender,
767                    error: e.to_string(),
768                    code: None,
769                };
770                self.event_sender.send(error_event).ok();
771                self.do_hangup(Some(CallRecordHangupReason::BySystem), None)
772                    .await
773                    .ok();
774                return Err(e);
775            }
776        }
777    }
778
779    async fn do_invite(&self, option: CallOption) -> Result<()> {
780        self.invite_or_accept(option, "invite".to_string())
781            .await
782            .map(|_| ())
783    }
784
785    async fn do_accept(&self, mut option: CallOption) -> Result<()> {
786        let has_pending = self
787            .invitation
788            .find_dialog_id_by_session_id(&self.session_id)
789            .is_some();
790        let ready_to_answer_val = {
791            let state = self.call_state.read().await;
792            state.ready_to_answer.is_none()
793        };
794
795        if ready_to_answer_val {
796            if !has_pending {
797                // emit reject event
798                warn!(session_id = self.session_id, "no pending call to accept");
799                let rejet_event = crate::event::SessionEvent::Reject {
800                    track_id: self.session_id.clone(),
801                    timestamp: crate::media::get_timestamp(),
802                    reason: "no pending call".to_string(),
803                    refer: None,
804                    code: Some(486),
805                };
806                self.event_sender.send(rejet_event).ok();
807                self.do_hangup(Some(CallRecordHangupReason::BySystem), None)
808                    .await
809                    .ok();
810                return Err(anyhow::anyhow!("no pending call to accept"));
811            }
812            option = self.invite_or_accept(option, "accept".to_string()).await?;
813        } else {
814            option.check_default();
815            self.call_state.write().await.option = Some(option.clone());
816        }
817        info!(session_id = self.session_id, ?option, "accepting call");
818        let ready = self.call_state.write().await.ready_to_answer.take();
819        if let Some((answer, track, dialog)) = ready {
820            info!(
821                session_id = self.session_id,
822                track_id = track.as_ref().map(|t| t.id()),
823                "ready to answer with track"
824            );
825
826            let headers = vec![rsip::Header::ContentType(
827                "application/sdp".to_string().into(),
828            )];
829
830            match dialog.accept(Some(headers), Some(answer.as_bytes().to_vec())) {
831                Ok(_) => {
832                    {
833                        let mut state = self.call_state.write().await;
834                        state.answer = Some(answer);
835                        state.answer_time = Some(Utc::now());
836                    }
837                    self.finish_caller_stack(&option, track).await?;
838                }
839                Err(e) => {
840                    warn!(session_id = self.session_id, "failed to accept call: {}", e);
841                    return Err(anyhow::anyhow!("failed to accept call"));
842                }
843            }
844        }
845        return Ok(());
846    }
847
848    async fn do_reject(
849        &self,
850        code: Option<rsip::StatusCode>,
851        reason: Option<String>,
852    ) -> Result<()> {
853        match self
854            .invitation
855            .find_dialog_id_by_session_id(&self.session_id)
856        {
857            Some(id) => {
858                info!(
859                    session_id = self.session_id,
860                    ?reason,
861                    ?code,
862                    "rejecting call"
863                );
864                self.invitation.hangup(id, code, reason).await
865            }
866            None => Ok(()),
867        }
868    }
869
870    async fn do_ringing(
871        &self,
872        ringtone: Option<String>,
873        recorder: Option<RecorderOption>,
874        early_media: Option<bool>,
875    ) -> Result<()> {
876        let ready_to_answer_val = self.call_state.read().await.ready_to_answer.is_none();
877        if ready_to_answer_val {
878            let option = CallOption {
879                recorder,
880                ..Default::default()
881            };
882            let _ = self.invite_or_accept(option, "ringing".to_string()).await?;
883        }
884
885        let state = self.call_state.read().await;
886        if let Some((answer, _, dialog)) = state.ready_to_answer.as_ref() {
887            let (headers, body) = if early_media.unwrap_or_default() || ringtone.is_some() {
888                let headers = vec![rsip::Header::ContentType(
889                    "application/sdp".to_string().into(),
890                )];
891                (Some(headers), Some(answer.as_bytes().to_vec()))
892            } else {
893                (None, None)
894            };
895
896            dialog.ringing(headers, body).ok();
897            info!(
898                session_id = self.session_id,
899                ringtone, early_media, "playing ringtone"
900            );
901            if let Some(ringtone_url) = ringtone {
902                drop(state);
903                self.do_play(ringtone_url, None, None, None).await.ok();
904            } else {
905                info!(session_id = self.session_id, "no ringtone to play");
906            }
907        }
908        Ok(())
909    }
910
911    async fn do_tts(
912        &self,
913        text: String,
914        speaker: Option<String>,
915        play_id: Option<String>,
916        auto_hangup: Option<bool>,
917        streaming: bool,
918        end_of_stream: bool,
919        option: Option<SynthesisOption>,
920        wait_input_timeout: Option<u32>,
921        base64: bool,
922    ) -> Result<()> {
923        let tts_option = {
924            let call_state = self.call_state.read().await;
925            match call_state.option.clone().unwrap_or_default().tts {
926                Some(opt) => opt.merge_with(option),
927                None => {
928                    if let Some(opt) = option {
929                        opt
930                    } else {
931                        return Err(anyhow::anyhow!("no tts option available"));
932                    }
933                }
934            }
935        };
936        let speaker = match speaker {
937            Some(s) => Some(s),
938            None => tts_option.speaker.clone(),
939        };
940
941        let mut play_command = SynthesisCommand {
942            text,
943            speaker,
944            play_id: play_id.clone(),
945            streaming,
946            end_of_stream,
947            option: tts_option,
948            base64,
949        };
950        info!(
951            session_id = self.session_id,
952            provider = ?play_command.option.provider,
953            text = %play_command.text.chars().take(10).collect::<String>(),
954            speaker = play_command.speaker.as_deref(),
955            auto_hangup = auto_hangup.unwrap_or_default(),
956            play_id = play_command.play_id.as_deref(),
957            streaming = play_command.streaming,
958            end_of_stream = play_command.end_of_stream,
959            wait_input_timeout = wait_input_timeout.unwrap_or_default(),
960            is_base64 = play_command.base64,
961            "new synthesis"
962        );
963
964        let ssrc = rand::random::<u32>();
965        let (should_interrupt, picked_ssrc) = {
966            let mut state = self.call_state.write().await;
967
968            let (target_ssrc, changed) = if let Some(handle) = &state.tts_handle {
969                if play_id.is_some() && state.current_play_id != play_id {
970                    (ssrc, true)
971                } else {
972                    (handle.ssrc, false)
973                }
974            } else {
975                (ssrc, false)
976            };
977
978            state.auto_hangup = match auto_hangup {
979                Some(true) => Some((target_ssrc, CallRecordHangupReason::BySystem)),
980                _ => state.auto_hangup.clone(),
981            };
982            state.wait_input_timeout = wait_input_timeout;
983
984            state.current_play_id = play_id.clone();
985            (changed, target_ssrc)
986        };
987
988        if should_interrupt {
989            let _ = self.do_interrupt(false).await;
990        }
991
992        let existing_handle = self.call_state.read().await.tts_handle.clone();
993        if let Some(tts_handle) = existing_handle {
994            match tts_handle.try_send(play_command) {
995                Ok(_) => return Ok(()),
996                Err(e) => {
997                    play_command = e.0;
998                }
999            }
1000        }
1001
1002        let (new_handle, tts_track) = StreamEngine::create_tts_track(
1003            self.app_state.stream_engine.clone(),
1004            self.cancel_token.child_token(),
1005            self.session_id.clone(),
1006            self.server_side_track_id.clone(),
1007            picked_ssrc,
1008            play_id.clone(),
1009            streaming,
1010            &play_command.option,
1011        )
1012        .await?;
1013
1014        new_handle.try_send(play_command)?;
1015        self.call_state.write().await.tts_handle = Some(new_handle);
1016        self.update_track_wrapper(tts_track, play_id).await;
1017        Ok(())
1018    }
1019
1020    async fn do_play(
1021        &self,
1022        url: String,
1023        play_id: Option<String>,
1024        auto_hangup: Option<bool>,
1025        wait_input_timeout: Option<u32>,
1026    ) -> Result<()> {
1027        let ssrc = rand::random::<u32>();
1028        info!(
1029            session_id = self.session_id,
1030            ssrc, url, play_id, auto_hangup, "play file track"
1031        );
1032
1033        let play_id = play_id.or(Some(url.clone()));
1034
1035        let file_track = FileTrack::new(self.server_side_track_id.clone())
1036            .with_play_id(play_id.clone())
1037            .with_ssrc(ssrc)
1038            .with_path(url)
1039            .with_cancel_token(self.cancel_token.child_token());
1040
1041        {
1042            let mut state = self.call_state.write().await;
1043            state.tts_handle = None;
1044            state.auto_hangup = match auto_hangup {
1045                Some(true) => Some((ssrc, CallRecordHangupReason::BySystem)),
1046                _ => None,
1047            };
1048            state.wait_input_timeout = wait_input_timeout;
1049        }
1050
1051        self.update_track_wrapper(Box::new(file_track), play_id)
1052            .await;
1053        Ok(())
1054    }
1055
1056    async fn do_history(&self, speaker: String, text: String) -> Result<()> {
1057        self.event_sender
1058            .send(SessionEvent::AddHistory {
1059                sender: Some(self.session_id.clone()),
1060                timestamp: crate::media::get_timestamp(),
1061                speaker,
1062                text,
1063            })
1064            .map(|_| ())
1065            .map_err(Into::into)
1066    }
1067
1068    async fn do_interrupt(&self, graceful: bool) -> Result<()> {
1069        {
1070            let mut state = self.call_state.write().await;
1071            state.tts_handle = None;
1072            state.moh = None;
1073        }
1074        self.media_stream
1075            .remove_track(&self.server_side_track_id, graceful)
1076            .await;
1077        Ok(())
1078    }
1079    async fn do_pause(&self) -> Result<()> {
1080        Ok(())
1081    }
1082    async fn do_resume(&self) -> Result<()> {
1083        Ok(())
1084    }
1085    async fn do_hangup(
1086        &self,
1087        reason: Option<CallRecordHangupReason>,
1088        initiator: Option<String>,
1089    ) -> Result<()> {
1090        info!(
1091            session_id = self.session_id,
1092            ?reason,
1093            ?initiator,
1094            "do_hangup"
1095        );
1096
1097        // Set hangup reason based on initiator and reason
1098        let hangup_reason = match initiator.as_deref() {
1099            Some("caller") => CallRecordHangupReason::ByCaller,
1100            Some("callee") => CallRecordHangupReason::ByCallee,
1101            Some("system") => CallRecordHangupReason::Autohangup,
1102            _ => reason.unwrap_or(CallRecordHangupReason::BySystem),
1103        };
1104
1105        self.media_stream
1106            .stop(Some(hangup_reason.to_string()), initiator);
1107
1108        self.call_state
1109            .write()
1110            .await
1111            .set_hangup_reason(hangup_reason);
1112        Ok(())
1113    }
1114
1115    async fn do_refer(
1116        &self,
1117        caller: String,
1118        callee: String,
1119        refer_option: Option<ReferOption>,
1120    ) -> Result<()> {
1121        self.do_interrupt(false).await.ok();
1122        let mut moh = refer_option.as_ref().and_then(|o| o.moh.clone());
1123        if let Some(ref path) = moh {
1124            if !path.starts_with("http") && !std::path::Path::new(path).exists() {
1125                let fallback = "./config/sounds/refer_moh.wav";
1126                if std::path::Path::new(fallback).exists() {
1127                    info!(
1128                        session_id = self.session_id,
1129                        "moh {} not found, using fallback {}", path, fallback
1130                    );
1131                    moh = Some(fallback.to_string());
1132                }
1133            }
1134        }
1135        let session_id = self.session_id.clone();
1136        let track_id = self.server_side_track_id.clone();
1137
1138        let recorder = {
1139            let cs = self.call_state.read().await;
1140            cs.option
1141                .as_ref()
1142                .map(|o| o.recorder.clone())
1143                .unwrap_or_default()
1144        };
1145
1146        let call_option = CallOption {
1147            caller: Some(caller),
1148            callee: Some(callee.clone()),
1149            sip: refer_option.as_ref().and_then(|o| o.sip.clone()),
1150            asr: refer_option.as_ref().and_then(|o| o.asr.clone()),
1151            denoise: refer_option.as_ref().and_then(|o| o.denoise.clone()),
1152            recorder,
1153            ..Default::default()
1154        };
1155
1156        let mut invite_option = call_option.build_invite_option()?;
1157        invite_option.call_id = Some(self.session_id.clone());
1158
1159        let headers = invite_option.headers.get_or_insert_with(|| Vec::new());
1160
1161        {
1162            let cs = self.call_state.read().await;
1163            if let Some(opt) = cs.option.as_ref() {
1164                if let Some(callee) = opt.callee.as_ref() {
1165                    headers.push(rsip::Header::Other(
1166                        "X-Referred-To".to_string(),
1167                        callee.clone(),
1168                    ));
1169                }
1170                if let Some(caller) = opt.caller.as_ref() {
1171                    headers.push(rsip::Header::Other(
1172                        "X-Referred-From".to_string(),
1173                        caller.clone(),
1174                    ));
1175                }
1176            }
1177        }
1178
1179        headers.push(rsip::Header::Other(
1180            "X-Referred-Id".to_string(),
1181            self.session_id.clone(),
1182        ));
1183
1184        let ssrc = rand::random::<u32>();
1185        let refer_call_state = Arc::new(RwLock::new(ActiveCallState {
1186            start_time: Utc::now(),
1187            ssrc,
1188            option: Some(call_option.clone()),
1189            is_refer: true,
1190            ..Default::default()
1191        }));
1192
1193        {
1194            let mut cs = self.call_state.write().await;
1195            cs.refer_callstate.replace(refer_call_state.clone());
1196        }
1197
1198        let auto_hangup_requested = refer_option
1199            .as_ref()
1200            .and_then(|o| o.auto_hangup)
1201            .unwrap_or(true);
1202
1203        if auto_hangup_requested {
1204            self.call_state.write().await.auto_hangup =
1205                Some((ssrc, CallRecordHangupReason::ByRefer));
1206        } else {
1207            self.call_state.write().await.auto_hangup = None;
1208        }
1209
1210        let timeout_secs = refer_option.as_ref().and_then(|o| o.timeout).unwrap_or(30);
1211
1212        info!(
1213            session_id = self.session_id,
1214            ssrc,
1215            auto_hangup = auto_hangup_requested,
1216            callee,
1217            timeout_secs,
1218            "do_refer"
1219        );
1220
1221        let r = tokio::time::timeout(
1222            Duration::from_secs(timeout_secs as u64),
1223            self.create_outgoing_sip_track(
1224                self.cancel_token.child_token(),
1225                refer_call_state.clone(),
1226                &track_id,
1227                invite_option,
1228                &call_option,
1229                moh,
1230                auto_hangup_requested,
1231            ),
1232        )
1233        .await;
1234
1235        {
1236            self.call_state.write().await.moh = None;
1237        }
1238
1239        let result = match r {
1240            Ok(res) => res,
1241            Err(_) => {
1242                warn!(
1243                    session_id = session_id,
1244                    "refer sip track creation timed out after {} seconds", timeout_secs
1245                );
1246                self.event_sender
1247                    .send(SessionEvent::Reject {
1248                        track_id,
1249                        timestamp: crate::media::get_timestamp(),
1250                        reason: "Timeout when refer".into(),
1251                        code: Some(408),
1252                        refer: Some(true),
1253                    })
1254                    .ok();
1255                return Err(anyhow::anyhow!("refer sip track creation timed out").into());
1256            }
1257        };
1258
1259        match result {
1260            Ok(answer) => {
1261                self.event_sender
1262                    .send(SessionEvent::Answer {
1263                        timestamp: crate::media::get_timestamp(),
1264                        track_id,
1265                        sdp: answer,
1266                        refer: Some(true),
1267                    })
1268                    .ok();
1269            }
1270            Err(e) => {
1271                warn!(
1272                    session_id = session_id,
1273                    "failed to create refer sip track: {}", e
1274                );
1275                match &e {
1276                    rsipstack::Error::DialogError(reason, _, code) => {
1277                        self.event_sender
1278                            .send(SessionEvent::Reject {
1279                                track_id,
1280                                timestamp: crate::media::get_timestamp(),
1281                                reason: reason.clone(),
1282                                code: Some(code.code() as u32),
1283                                refer: Some(true),
1284                            })
1285                            .ok();
1286                    }
1287                    _ => {}
1288                }
1289                return Err(e.into());
1290            }
1291        }
1292        Ok(())
1293    }
1294
1295    async fn do_mute(&self, track_id: Option<String>) -> Result<()> {
1296        self.media_stream.mute_track(track_id).await;
1297        Ok(())
1298    }
1299
1300    async fn do_unmute(&self, track_id: Option<String>) -> Result<()> {
1301        self.media_stream.unmute_track(track_id).await;
1302        Ok(())
1303    }
1304
1305    pub async fn cleanup(&self) -> Result<()> {
1306        self.call_state.write().await.tts_handle = None;
1307        self.media_stream.cleanup().await.ok();
1308        Ok(())
1309    }
1310
1311    pub fn get_callrecord(&self) -> Option<CallRecord> {
1312        self.call_state.try_read().ok().map(|call_state| {
1313            call_state.build_callrecord(
1314                self.app_state.clone(),
1315                self.session_id.clone(),
1316                self.call_type.clone(),
1317            )
1318        })
1319    }
1320
1321    async fn dump_to_file(
1322        &self,
1323        dump_file: &mut File,
1324        cmd_receiver: &mut CommandReceiver,
1325        event_receiver: &mut EventReceiver,
1326    ) {
1327        loop {
1328            select! {
1329                _ = self.cancel_token.cancelled() => {
1330                    break;
1331                }
1332                Ok(cmd) = cmd_receiver.recv() => {
1333                    CallRecordEvent::write(CallRecordEventType::Command, cmd, dump_file)
1334                        .await;
1335                }
1336                Ok(event) = event_receiver.recv() => {
1337                    if matches!(event, SessionEvent::Binary{..}) {
1338                        continue;
1339                    }
1340                    CallRecordEvent::write(CallRecordEventType::Event, event, dump_file)
1341                        .await;
1342                }
1343            };
1344        }
1345    }
1346
1347    async fn dump_loop(
1348        &self,
1349        dump_events: bool,
1350        mut dump_cmd_receiver: CommandReceiver,
1351        mut dump_event_receiver: EventReceiver,
1352    ) {
1353        if !dump_events {
1354            return;
1355        }
1356
1357        let file_name = self.app_state.get_dump_events_file(&self.session_id);
1358        let mut dump_file = match File::options()
1359            .create(true)
1360            .append(true)
1361            .open(&file_name)
1362            .await
1363        {
1364            Ok(file) => file,
1365            Err(e) => {
1366                warn!(
1367                    session_id = self.session_id,
1368                    file_name, "failed to open dump events file: {}", e
1369                );
1370                return;
1371            }
1372        };
1373        self.dump_to_file(
1374            &mut dump_file,
1375            &mut dump_cmd_receiver,
1376            &mut dump_event_receiver,
1377        )
1378        .await;
1379
1380        while let Ok(event) = dump_event_receiver.try_recv() {
1381            if matches!(event, SessionEvent::Binary { .. }) {
1382                continue;
1383            }
1384            CallRecordEvent::write(CallRecordEventType::Event, event, &mut dump_file).await;
1385        }
1386    }
1387
1388    pub async fn create_rtp_track(&self, track_id: TrackId, ssrc: u32) -> Result<RtcTrack> {
1389        let mut rtc_config = RtcTrackConfig::default();
1390        rtc_config.mode = rustrtc::TransportMode::Rtp;
1391
1392        if let Some(codecs) = &self.app_state.config.codecs {
1393            let mut codec_types = Vec::new();
1394            for c in codecs {
1395                match c.to_lowercase().as_str() {
1396                    "pcmu" => codec_types.push(CodecType::PCMU),
1397                    "pcma" => codec_types.push(CodecType::PCMA),
1398                    "g722" => codec_types.push(CodecType::G722),
1399                    "g729" => codec_types.push(CodecType::G729),
1400                    "opus" => codec_types.push(CodecType::Opus),
1401                    "dtmf" | "2833" | "telephone_event" => {
1402                        codec_types.push(CodecType::TelephoneEvent)
1403                    }
1404                    _ => {}
1405                }
1406            }
1407            if !codec_types.is_empty() {
1408                rtc_config.preferred_codec = Some(codec_types[0].clone());
1409                rtc_config.codecs = codec_types;
1410            }
1411        }
1412
1413        if rtc_config.preferred_codec.is_none() {
1414            rtc_config.preferred_codec = Some(self.track_config.codec.clone());
1415        }
1416
1417        rtc_config.rtp_port_range = self
1418            .app_state
1419            .config
1420            .rtp_start_port
1421            .zip(self.app_state.config.rtp_end_port);
1422
1423        if let Some(ref external_ip) = self.app_state.config.external_ip {
1424            rtc_config.external_ip = Some(external_ip.clone());
1425        }
1426
1427        rtc_config.enable_latching = self.app_state.config.enable_rtp_latching;
1428
1429        let mut track = RtcTrack::new(
1430            self.cancel_token.child_token(),
1431            track_id,
1432            self.track_config.clone(),
1433            rtc_config,
1434        )
1435        .with_ssrc(ssrc);
1436
1437        track.create().await?;
1438
1439        Ok(track)
1440    }
1441
1442    async fn setup_caller_track(&self, option: &CallOption) -> Result<()> {
1443        self.call_state.write().await.option = Some(option.clone());
1444        info!(
1445            session_id = self.session_id,
1446            call_type = ?self.call_type,
1447            "setup caller track"
1448        );
1449
1450        let track = match self.call_type {
1451            ActiveCallType::Webrtc => Some(self.create_webrtc_track().await?),
1452            ActiveCallType::WebSocket => {
1453                let audio_receiver = self.call_state.write().await.audio_receiver.take();
1454                if let Some(receiver) = audio_receiver {
1455                    Some(self.create_websocket_track(receiver).await?)
1456                } else {
1457                    None
1458                }
1459            }
1460            ActiveCallType::Sip => {
1461                if let Some(dialog_id) = self
1462                    .invitation
1463                    .find_dialog_id_by_session_id(&self.session_id)
1464                {
1465                    if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1466                        return self
1467                            .prepare_incoming_sip_track(
1468                                self.cancel_token.clone(),
1469                                self.call_state.clone(),
1470                                &self.session_id,
1471                                pending_dialog,
1472                            )
1473                            .await;
1474                    }
1475                }
1476
1477                // Auto-inject credentials from registered users if not already provided
1478                let mut option = option.clone();
1479                if option.sip.is_none()
1480                    || option
1481                        .sip
1482                        .as_ref()
1483                        .and_then(|s| s.username.as_ref())
1484                        .is_none()
1485                {
1486                    if let Some(callee) = &option.callee {
1487                        if let Some(cred) = self.app_state.find_credentials_for_callee(callee) {
1488                            if option.sip.is_none() {
1489                                option.sip = Some(crate::SipOption {
1490                                    username: Some(cred.username.clone()),
1491                                    password: Some(cred.password.clone()),
1492                                    realm: cred.realm.clone(),
1493                                    ..Default::default()
1494                                });
1495                            }
1496                        }
1497                    }
1498                }
1499
1500                let mut invite_option = option.build_invite_option()?;
1501                invite_option.call_id = Some(self.session_id.clone());
1502
1503                match self
1504                    .create_outgoing_sip_track(
1505                        self.cancel_token.clone(),
1506                        self.call_state.clone(),
1507                        &self.session_id,
1508                        invite_option,
1509                        &option,
1510                        None,
1511                        false,
1512                    )
1513                    .await
1514                {
1515                    Ok(answer) => {
1516                        self.event_sender
1517                            .send(SessionEvent::Answer {
1518                                timestamp: crate::media::get_timestamp(),
1519                                track_id: self.session_id.clone(),
1520                                sdp: answer,
1521                                refer: Some(false),
1522                            })
1523                            .ok();
1524                        return Ok(());
1525                    }
1526                    Err(e) => {
1527                        warn!(
1528                            session_id = self.session_id,
1529                            "failed to create sip track: {}", e
1530                        );
1531                        match &e {
1532                            rsipstack::Error::DialogError(reason, _, code) => {
1533                                self.event_sender
1534                                    .send(SessionEvent::Reject {
1535                                        track_id: self.session_id.clone(),
1536                                        timestamp: crate::media::get_timestamp(),
1537                                        reason: reason.clone(),
1538                                        code: Some(code.code() as u32),
1539                                        refer: Some(false),
1540                                    })
1541                                    .ok();
1542                            }
1543                            _ => {}
1544                        }
1545                        return Err(e.into());
1546                    }
1547                }
1548            }
1549            ActiveCallType::B2bua => {
1550                if let Some(dialog_id) = self
1551                    .invitation
1552                    .find_dialog_id_by_session_id(&self.session_id)
1553                {
1554                    if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1555                        return self
1556                            .prepare_incoming_sip_track(
1557                                self.cancel_token.clone(),
1558                                self.call_state.clone(),
1559                                &self.session_id,
1560                                pending_dialog,
1561                            )
1562                            .await;
1563                    }
1564                }
1565
1566                warn!(
1567                    session_id = self.session_id,
1568                    "no pending dialog found for B2BUA call"
1569                );
1570                return Err(anyhow::anyhow!(
1571                    "no pending dialog found for session_id: {}",
1572                    self.session_id
1573                ));
1574            }
1575        };
1576        match track {
1577            Some(track) => {
1578                self.finish_caller_stack(&option, Some(track)).await?;
1579            }
1580            None => {
1581                warn!(session_id = self.session_id, "no track created for caller");
1582                return Err(anyhow::anyhow!("no track created for caller"));
1583            }
1584        }
1585        Ok(())
1586    }
1587
1588    async fn finish_caller_stack(
1589        &self,
1590        option: &CallOption,
1591        track: Option<Box<dyn Track>>,
1592    ) -> Result<()> {
1593        if let Some(track) = track {
1594            self.setup_track_with_stream(&option, track).await?;
1595        }
1596
1597        {
1598            let call_state = self.call_state.read().await;
1599            if let Some(ref answer) = call_state.answer {
1600                info!(
1601                    session_id = self.session_id,
1602                    "sending answer event: {}", answer,
1603                );
1604                self.event_sender
1605                    .send(SessionEvent::Answer {
1606                        timestamp: crate::media::get_timestamp(),
1607                        track_id: self.session_id.clone(),
1608                        sdp: answer.clone(),
1609                        refer: Some(false),
1610                    })
1611                    .ok();
1612            } else {
1613                warn!(
1614                    session_id = self.session_id,
1615                    "no answer in state to send event"
1616                );
1617            }
1618        }
1619        Ok(())
1620    }
1621
1622    pub async fn setup_track_with_stream(
1623        &self,
1624        option: &CallOption,
1625        mut track: Box<dyn Track>,
1626    ) -> Result<()> {
1627        let processors = match StreamEngine::create_processors(
1628            self.app_state.stream_engine.clone(),
1629            track.as_ref(),
1630            self.cancel_token.child_token(),
1631            self.event_sender.clone(),
1632            self.media_stream.packet_sender.clone(),
1633            option,
1634        )
1635        .await
1636        {
1637            Ok(processors) => processors,
1638            Err(e) => {
1639                warn!(
1640                    session_id = self.session_id,
1641                    "failed to prepare stream processors: {}", e
1642                );
1643                vec![]
1644            }
1645        };
1646
1647        // Add all processors from the hook
1648        for processor in processors {
1649            track.append_processor(processor);
1650        }
1651
1652        self.update_track_wrapper(track, None).await;
1653        Ok(())
1654    }
1655
1656    pub async fn update_track_wrapper(&self, mut track: Box<dyn Track>, play_id: Option<String>) {
1657        let ambiance_opt = {
1658            let state = self.call_state.read().await;
1659            let mut opt = state
1660                .option
1661                .as_ref()
1662                .and_then(|o| o.ambiance.clone())
1663                .unwrap_or_default();
1664
1665            if let Some(global) = &self.app_state.config.ambiance {
1666                opt.merge(global);
1667            }
1668            opt
1669        };
1670        if track.id() == &self.server_side_track_id && ambiance_opt.path.is_some() {
1671            match AmbianceProcessor::new(ambiance_opt).await {
1672                Ok(ambiance) => {
1673                    info!(session_id = self.session_id, "loaded ambiance processor");
1674                    track.append_processor(Box::new(ambiance));
1675                }
1676                Err(e) => {
1677                    tracing::error!("failed to load ambiance wav {}", e);
1678                }
1679            }
1680        }
1681        self.call_state.write().await.current_play_id = play_id.clone();
1682        self.media_stream.update_track(track, play_id).await;
1683    }
1684
1685    pub async fn create_websocket_track(
1686        &self,
1687        audio_receiver: WebsocketBytesReceiver,
1688    ) -> Result<Box<dyn Track>> {
1689        let (ssrc, codec) = {
1690            let call_state = self.call_state.read().await;
1691            (
1692                call_state.ssrc,
1693                call_state
1694                    .option
1695                    .as_ref()
1696                    .map(|o| o.codec.clone())
1697                    .unwrap_or_default(),
1698            )
1699        };
1700
1701        let ws_track = WebsocketTrack::new(
1702            self.cancel_token.child_token(),
1703            self.session_id.clone(),
1704            self.track_config.clone(),
1705            self.event_sender.clone(),
1706            audio_receiver,
1707            codec,
1708            ssrc,
1709        );
1710
1711        {
1712            let mut call_state = self.call_state.write().await;
1713            call_state.answer_time = Some(Utc::now());
1714            call_state.answer = Some("".to_string());
1715            call_state.last_status_code = 200;
1716        }
1717
1718        Ok(Box::new(ws_track))
1719    }
1720
1721    pub(super) async fn create_webrtc_track(&self) -> Result<Box<dyn Track>> {
1722        let (ssrc, option) = {
1723            let call_state = self.call_state.read().await;
1724            (
1725                call_state.ssrc,
1726                call_state.option.clone().unwrap_or_default(),
1727            )
1728        };
1729
1730        let mut rtc_config = RtcTrackConfig::default();
1731        rtc_config.mode = rustrtc::TransportMode::WebRtc; // WebRTC
1732        rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
1733
1734        if let Some(codecs) = &self.app_state.config.codecs {
1735            let mut codec_types = Vec::new();
1736            for c in codecs {
1737                match c.to_lowercase().as_str() {
1738                    "pcmu" => codec_types.push(CodecType::PCMU),
1739                    "pcma" => codec_types.push(CodecType::PCMA),
1740                    "g722" => codec_types.push(CodecType::G722),
1741                    "g729" => codec_types.push(CodecType::G729),
1742                    "opus" => codec_types.push(CodecType::Opus),
1743                    "dtmf" | "2833" | "telephone_event" => {
1744                        codec_types.push(CodecType::TelephoneEvent)
1745                    }
1746                    _ => {}
1747                }
1748            }
1749            if !codec_types.is_empty() {
1750                rtc_config.preferred_codec = Some(codec_types[0].clone());
1751                rtc_config.codecs = codec_types;
1752            }
1753        }
1754
1755        if let Some(ref external_ip) = self.app_state.config.external_ip {
1756            rtc_config.external_ip = Some(external_ip.clone());
1757        }
1758
1759        let mut webrtc_track = RtcTrack::new(
1760            self.cancel_token.child_token(),
1761            self.session_id.clone(),
1762            self.track_config.clone(),
1763            rtc_config,
1764        )
1765        .with_ssrc(ssrc);
1766
1767        let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
1768        let offer = match option.enable_ipv6 {
1769            Some(false) | None => {
1770                strip_ipv6_candidates(option.offer.as_ref().unwrap_or(&"".to_string()))
1771            }
1772            _ => option.offer.clone().unwrap_or("".to_string()),
1773        };
1774        let answer: Option<String>;
1775        match webrtc_track.handshake(offer, timeout).await {
1776            Ok(answer_sdp) => {
1777                answer = match option.enable_ipv6 {
1778                    Some(false) | None => Some(strip_ipv6_candidates(&answer_sdp)),
1779                    Some(true) => Some(answer_sdp.to_string()),
1780                };
1781            }
1782            Err(e) => {
1783                warn!(session_id = self.session_id, "failed to setup track: {}", e);
1784                return Err(anyhow::anyhow!("Failed to setup track: {}", e));
1785            }
1786        }
1787
1788        {
1789            let mut call_state = self.call_state.write().await;
1790            call_state.answer_time = Some(Utc::now());
1791            call_state.answer = answer;
1792            call_state.last_status_code = 200;
1793        }
1794        Ok(Box::new(webrtc_track))
1795    }
1796
1797    async fn create_outgoing_sip_track(
1798        &self,
1799        cancel_token: CancellationToken,
1800        call_state_ref: ActiveCallStateRef,
1801        track_id: &String,
1802        mut invite_option: InviteOption,
1803        call_option: &CallOption,
1804        moh: Option<String>,
1805        auto_hangup: bool,
1806    ) -> Result<String, rsipstack::Error> {
1807        let ssrc = call_state_ref.read().await.ssrc;
1808        let rtp_track = self
1809            .create_rtp_track(track_id.clone(), ssrc)
1810            .await
1811            .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1812
1813        let offer = Some(
1814            rtp_track
1815                .local_description()
1816                .await
1817                .map_err(|e| rsipstack::Error::Error(e.to_string()))?,
1818        );
1819
1820        {
1821            let mut cs = call_state_ref.write().await;
1822            if let Some(o) = cs.option.as_mut() {
1823                o.offer = offer.clone();
1824            }
1825            cs.start_time = Utc::now();
1826        };
1827
1828        invite_option.offer = offer.clone().map(|s| s.into());
1829
1830        // Set contact to local SIP endpoint address if not already set explicitly
1831        // Check if contact is still default (no scheme set) or if host is localhost-like
1832        let needs_contact = invite_option.contact.scheme.is_none()
1833            || invite_option
1834                .contact
1835                .host_with_port
1836                .to_string()
1837                .starts_with("127.0.0.1");
1838
1839        if needs_contact {
1840            if let Some(addr) = self.invitation.dialog_layer.endpoint.get_addrs().first() {
1841                invite_option.contact = rsip::Uri {
1842                    scheme: Some(rsip::Scheme::Sip),
1843                    auth: None,
1844                    host_with_port: addr.addr.clone(),
1845                    params: vec![],
1846                    headers: vec![],
1847                };
1848            }
1849        }
1850
1851        let mut rtp_track_to_setup = Some(Box::new(rtp_track) as Box<dyn Track>);
1852
1853        if let Some(moh) = moh {
1854            let ssrc_and_moh = {
1855                let mut state = call_state_ref.write().await;
1856                state.moh = Some(moh.clone());
1857                if state.current_play_id.is_none() {
1858                    let ssrc = rand::random::<u32>();
1859                    Some((ssrc, moh.clone()))
1860                } else {
1861                    info!(
1862                        session_id = self.session_id,
1863                        "Something is playing, MOH will start after it ends"
1864                    );
1865                    None
1866                }
1867            };
1868
1869            if let Some((ssrc, moh_path)) = ssrc_and_moh {
1870                let file_track = FileTrack::new(self.server_side_track_id.clone())
1871                    .with_play_id(Some(moh_path.clone()))
1872                    .with_ssrc(ssrc)
1873                    .with_path(moh_path.clone())
1874                    .with_cancel_token(self.cancel_token.child_token());
1875                self.update_track_wrapper(Box::new(file_track), Some(moh_path))
1876                    .await;
1877            }
1878        } else {
1879            let track = rtp_track_to_setup.take().unwrap();
1880            self.setup_track_with_stream(&call_option, track)
1881                .await
1882                .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1883        }
1884
1885        info!(
1886            session_id = self.session_id,
1887            track_id,
1888            contact = %invite_option.contact,
1889            "invite {} -> {} offer: \n{}",
1890            invite_option.caller,
1891            invite_option.callee,
1892            offer.as_ref().map(|s| s.as_str()).unwrap_or("<NO OFFER>")
1893        );
1894
1895        let (dlg_state_sender, dlg_state_receiver) =
1896            self.invitation.dialog_layer.new_dialog_state_channel();
1897
1898        let states = InviteDialogStates {
1899            is_client: true,
1900            session_id: self.session_id.clone(),
1901            track_id: track_id.clone(),
1902            event_sender: self.event_sender.clone(),
1903            media_stream: self.media_stream.clone(),
1904            call_state: call_state_ref.clone(),
1905            cancel_token,
1906            terminated_reason: None,
1907            has_early_media: false,
1908        };
1909
1910        let mut client_dialog_handler =
1911            DialogStateReceiverGuard::new(self.invitation.dialog_layer.clone(), dlg_state_receiver);
1912
1913        crate::spawn(async move {
1914            client_dialog_handler.process_dialog(states).await;
1915        });
1916
1917        let (dialog_id, answer) = self
1918            .invitation
1919            .invite(invite_option, dlg_state_sender)
1920            .await?;
1921
1922        self.call_state.write().await.moh = None;
1923
1924        if let Some(track) = rtp_track_to_setup {
1925            self.setup_track_with_stream(&call_option, track)
1926                .await
1927                .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1928        }
1929
1930        let answer = match answer {
1931            Some(answer) => String::from_utf8_lossy(&answer).to_string(),
1932            None => {
1933                warn!(session_id = self.session_id, "no answer received");
1934                return Err(rsipstack::Error::DialogError(
1935                    "No answer received".to_string(),
1936                    dialog_id,
1937                    rsip::StatusCode::NotAcceptableHere,
1938                ));
1939            }
1940        };
1941
1942        {
1943            let mut cs = call_state_ref.write().await;
1944            if cs.answer.is_none() {
1945                cs.answer = Some(answer.clone());
1946            }
1947            if auto_hangup {
1948                cs.auto_hangup = Some((ssrc, CallRecordHangupReason::ByRefer));
1949            }
1950        }
1951
1952        self.media_stream
1953            .update_remote_description(&track_id, &answer)
1954            .await
1955            .ok();
1956
1957        Ok(answer)
1958    }
1959
1960    /// Detect if SDP is WebRTC format
1961    pub fn is_webrtc_sdp(sdp: &str) -> bool {
1962        (sdp.contains("a=ice-ufrag:") || sdp.contains("a=ice-pwd:"))
1963            && sdp.contains("a=fingerprint:")
1964    }
1965
1966    pub async fn setup_answer_track(
1967        &self,
1968        ssrc: u32,
1969        option: &CallOption,
1970        offer: String,
1971    ) -> Result<(String, Box<dyn Track>)> {
1972        let offer = match option.enable_ipv6 {
1973            Some(false) | None => strip_ipv6_candidates(&offer),
1974            _ => offer.clone(),
1975        };
1976
1977        let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
1978
1979        let mut media_track = if Self::is_webrtc_sdp(&offer) {
1980            let mut rtc_config = RtcTrackConfig::default();
1981            rtc_config.mode = rustrtc::TransportMode::WebRtc;
1982            rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
1983            if let Some(ref external_ip) = self.app_state.config.external_ip {
1984                rtc_config.external_ip = Some(external_ip.clone());
1985            }
1986            rtc_config.enable_latching = self.app_state.config.enable_rtp_latching;
1987
1988            let webrtc_track = RtcTrack::new(
1989                self.cancel_token.child_token(),
1990                self.session_id.clone(),
1991                self.track_config.clone(),
1992                rtc_config,
1993            )
1994            .with_ssrc(ssrc);
1995
1996            Box::new(webrtc_track) as Box<dyn Track>
1997        } else {
1998            let rtp_track = self.create_rtp_track(self.session_id.clone(), ssrc).await?;
1999            Box::new(rtp_track) as Box<dyn Track>
2000        };
2001
2002        let answer = match media_track.handshake(offer.clone(), timeout).await {
2003            Ok(answer) => answer,
2004            Err(e) => {
2005                return Err(anyhow::anyhow!("handshake failed: {e}"));
2006            }
2007        };
2008
2009        return Ok((answer, media_track));
2010    }
2011
2012    pub async fn prepare_incoming_sip_track(
2013        &self,
2014        cancel_token: CancellationToken,
2015        call_state_ref: ActiveCallStateRef,
2016        track_id: &String,
2017        pending_dialog: PendingDialog,
2018    ) -> Result<()> {
2019        let state_receiver = pending_dialog.state_receiver;
2020        //let pending_token_clone = pending_dialog.token;
2021
2022        let states = InviteDialogStates {
2023            is_client: false,
2024            session_id: self.session_id.clone(),
2025            track_id: track_id.clone(),
2026            event_sender: self.event_sender.clone(),
2027            media_stream: self.media_stream.clone(),
2028            call_state: self.call_state.clone(),
2029            cancel_token,
2030            terminated_reason: None,
2031            has_early_media: false,
2032        };
2033
2034        let initial_request = pending_dialog.dialog.initial_request();
2035        let offer = String::from_utf8_lossy(&initial_request.body).to_string();
2036
2037        let (ssrc, option) = {
2038            let call_state = call_state_ref.read().await;
2039            (
2040                call_state.ssrc,
2041                call_state.option.clone().unwrap_or_default(),
2042            )
2043        };
2044
2045        match self.setup_answer_track(ssrc, &option, offer).await {
2046            Ok((offer, track)) => {
2047                self.setup_track_with_stream(&option, track).await?;
2048                {
2049                    let mut state = self.call_state.write().await;
2050                    state.ready_to_answer = Some((offer, None, pending_dialog.dialog));
2051                }
2052            }
2053            Err(e) => {
2054                return Err(anyhow::anyhow!("error creating track: {}", e));
2055            }
2056        }
2057
2058        let mut client_dialog_handler =
2059            DialogStateReceiverGuard::new(self.invitation.dialog_layer.clone(), state_receiver);
2060
2061        crate::spawn(async move {
2062            client_dialog_handler.process_dialog(states).await;
2063        });
2064        Ok(())
2065    }
2066}
2067
2068impl Drop for ActiveCall {
2069    fn drop(&mut self) {
2070        info!(session_id = self.session_id, "dropping active call");
2071        if let Some(sender) = self.app_state.callrecord_sender.as_ref() {
2072            if let Some(record) = self.get_callrecord() {
2073                if let Err(e) = sender.send(record) {
2074                    warn!(
2075                        session_id = self.session_id,
2076                        "failed to send call record: {}", e
2077                    );
2078                }
2079            }
2080        }
2081    }
2082}
2083
2084impl ActiveCallState {
2085    pub fn merge_option(&self, mut option: CallOption) -> CallOption {
2086        if let Some(existing) = &self.option {
2087            if option.asr.is_none() {
2088                option.asr = existing.asr.clone();
2089            }
2090            if option.tts.is_none() {
2091                option.tts = existing.tts.clone();
2092            }
2093            if option.vad.is_none() {
2094                option.vad = existing.vad.clone();
2095            }
2096            if option.denoise.is_none() {
2097                option.denoise = existing.denoise;
2098            }
2099            if option.recorder.is_none() {
2100                option.recorder = existing.recorder.clone();
2101            }
2102            if option.eou.is_none() {
2103                option.eou = existing.eou.clone();
2104            }
2105            if option.extra.is_none() {
2106                option.extra = existing.extra.clone();
2107            }
2108            if option.ambiance.is_none() {
2109                option.ambiance = existing.ambiance.clone();
2110            }
2111        }
2112        option
2113    }
2114
2115    pub fn set_hangup_reason(&mut self, reason: CallRecordHangupReason) {
2116        if self.hangup_reason.is_none() {
2117            self.hangup_reason = Some(reason);
2118        }
2119    }
2120
2121    pub fn build_hangup_event(
2122        &self,
2123        track_id: TrackId,
2124        initiator: Option<String>,
2125    ) -> crate::event::SessionEvent {
2126        let from = self.option.as_ref().and_then(|o| o.caller.as_ref());
2127        let to = self.option.as_ref().and_then(|o| o.callee.as_ref());
2128        let extra = self.extras.clone();
2129
2130        crate::event::SessionEvent::Hangup {
2131            track_id,
2132            timestamp: crate::media::get_timestamp(),
2133            reason: Some(format!("{:?}", self.hangup_reason)),
2134            initiator,
2135            start_time: self.start_time.to_rfc3339(),
2136            answer_time: self.answer_time.map(|t| t.to_rfc3339()),
2137            ringing_time: self.ring_time.map(|t| t.to_rfc3339()),
2138            hangup_time: Utc::now().to_rfc3339(),
2139            extra,
2140            from: from.map(|f| f.into()),
2141            to: to.map(|f| f.into()),
2142            refer: Some(self.is_refer),
2143        }
2144    }
2145
2146    pub fn build_callrecord(
2147        &self,
2148        app_state: AppState,
2149        session_id: String,
2150        call_type: ActiveCallType,
2151    ) -> CallRecord {
2152        let option = self.option.clone().unwrap_or_default();
2153        let recorder = if option.recorder.is_some() {
2154            let recorder_file = app_state.get_recorder_file(&session_id);
2155            if std::path::Path::new(&recorder_file).exists() {
2156                let file_size = std::fs::metadata(&recorder_file)
2157                    .map(|m| m.len())
2158                    .unwrap_or(0);
2159                vec![crate::callrecord::CallRecordMedia {
2160                    track_id: session_id.clone(),
2161                    path: recorder_file,
2162                    size: file_size,
2163                    extra: None,
2164                }]
2165            } else {
2166                vec![]
2167            }
2168        } else {
2169            vec![]
2170        };
2171
2172        let dump_event_file = app_state.get_dump_events_file(&session_id);
2173        let dump_event_file = if std::path::Path::new(&dump_event_file).exists() {
2174            Some(dump_event_file)
2175        } else {
2176            None
2177        };
2178
2179        let refer_callrecord = self.refer_callstate.as_ref().and_then(|rc| {
2180            if let Ok(rc) = rc.try_read() {
2181                Some(Box::new(rc.build_callrecord(
2182                    app_state.clone(),
2183                    rc.session_id.clone(),
2184                    ActiveCallType::B2bua,
2185                )))
2186            } else {
2187                None
2188            }
2189        });
2190
2191        let caller = option.caller.clone().unwrap_or_default();
2192        let callee = option.callee.clone().unwrap_or_default();
2193
2194        CallRecord {
2195            option: Some(option),
2196            call_id: session_id,
2197            call_type,
2198            start_time: self.start_time,
2199            ring_time: self.ring_time.clone(),
2200            answer_time: self.answer_time.clone(),
2201            end_time: Utc::now(),
2202            caller,
2203            callee,
2204            hangup_reason: self.hangup_reason.clone(),
2205            hangup_messages: Vec::new(),
2206            status_code: self.last_status_code,
2207            extras: self.extras.clone(),
2208            dump_event_file,
2209            recorder,
2210            refer_callrecord,
2211        }
2212    }
2213}