active_call/call/
active_call.rs

1use super::Command;
2use crate::{
3    CallOption, ReferOption,
4    event::{EventReceiver, EventSender, SessionEvent},
5    media::{
6        TrackId,
7        ambiance::AmbianceProcessor,
8        engine::StreamEngine,
9        negotiate::strip_ipv6_candidates,
10        recorder::RecorderOption,
11        stream::{MediaStream, MediaStreamBuilder},
12        track::{
13            Track, TrackConfig,
14            file::FileTrack,
15            media_pass::MediaPassTrack,
16            rtc::{RtcTrack, RtcTrackConfig},
17            tts::SynthesisHandle,
18            websocket::{WebsocketBytesReceiver, WebsocketTrack},
19        },
20    },
21    synthesis::{SynthesisCommand, SynthesisOption},
22};
23use crate::{
24    app::AppState,
25    call::{
26        CommandReceiver, CommandSender,
27        sip::{DialogStateReceiverGuard, Invitation, InviteDialogStates},
28    },
29    callrecord::{CallRecord, CallRecordEvent, CallRecordEventType, CallRecordHangupReason},
30    useragent::invitation::PendingDialog,
31};
32use anyhow::Result;
33use audio_codec::CodecType;
34use chrono::{DateTime, Utc};
35use rsipstack::dialog::{invitation::InviteOption, server_dialog::ServerInviteDialog};
36use serde::{Deserialize, Serialize};
37use std::{collections::HashMap, path::Path, sync::Arc, time::Duration};
38use tokio::{fs::File, select, sync::Mutex, sync::RwLock, time::sleep};
39use tokio_util::sync::CancellationToken;
40use tracing::{debug, info, warn};
41
42#[cfg(test)]
43mod tests {
44    use super::*;
45    use crate::app::AppStateBuilder;
46    use crate::callrecord::CallRecordHangupReason;
47    use crate::config::Config;
48    use crate::media::track::tts::SynthesisHandle;
49    use crate::synthesis::SynthesisCommand;
50    use tokio::sync::mpsc;
51
52    #[tokio::test]
53    async fn test_tts_ssrc_reuse_for_autohangup() -> Result<()> {
54        let mut config = Config::default();
55        config.udp_port = 0; // Use random port
56        config.media_cache_path = "/tmp/mediacache".to_string();
57        let stream_engine = Arc::new(StreamEngine::default());
58        let app_state = AppStateBuilder::new()
59            .with_config(config)
60            .with_stream_engine(stream_engine)
61            .build()
62            .await?;
63
64        let cancel_token = CancellationToken::new();
65        let session_id = "test-session".to_string();
66        let track_config = TrackConfig::default();
67
68        let mut option = crate::CallOption::default();
69        option.tts = Some(crate::synthesis::SynthesisOption::default());
70
71        let active_call = Arc::new(ActiveCall::new(
72            ActiveCallType::Sip,
73            cancel_token.clone(),
74            session_id.clone(),
75            app_state.invitation.clone(),
76            app_state.clone(),
77            track_config,
78            None,
79            false,
80            None,
81            None,
82        ));
83
84        {
85            let mut state = active_call.call_state.write().await;
86            state.option = Some(option);
87        }
88
89        let (tx, mut rx) = mpsc::unbounded_channel::<SynthesisCommand>();
90        let initial_ssrc = 12345;
91        let handle = SynthesisHandle::new(tx, Some("play_1".to_string()), initial_ssrc);
92
93        // 1. Set initial TTS handle
94        {
95            let mut state = active_call.call_state.write().await;
96            state.tts_handle = Some(handle);
97            state.current_play_id = Some("play_1".to_string());
98        }
99
100        // 2. Call do_tts with auto_hangup=true and same play_id
101        active_call
102            .do_tts(
103                "hangup now".to_string(),
104                None,
105                Some("play_1".to_string()),
106                Some(true),
107                false,
108                true,
109                None,
110                None,
111                false,
112            )
113            .await?;
114
115        // 3. Verify auto_hangup state uses the EXISTING SSRC
116        {
117            let state = active_call.call_state.read().await;
118            assert!(state.auto_hangup.is_some());
119            let (h_ssrc, reason) = state.auto_hangup.clone().unwrap();
120            assert_eq!(
121                h_ssrc, initial_ssrc,
122                "SSRC should be reused from existing handle"
123            );
124            assert_eq!(reason, CallRecordHangupReason::BySystem);
125        }
126
127        // 4. Verify command was sent to existing channel
128        let cmd = rx.try_recv().expect("Should have received tts command");
129        assert_eq!(cmd.text, "hangup now");
130
131        Ok(())
132    }
133
134    #[tokio::test]
135    async fn test_tts_new_ssrc_for_different_play_id() -> Result<()> {
136        let mut config = Config::default();
137        config.udp_port = 0; // Use random port
138        config.media_cache_path = "/tmp/mediacache".to_string();
139        let stream_engine = Arc::new(StreamEngine::default());
140        let app_state = AppStateBuilder::new()
141            .with_config(config)
142            .with_stream_engine(stream_engine)
143            .build()
144            .await?;
145
146        let active_call = Arc::new(ActiveCall::new(
147            ActiveCallType::Sip,
148            CancellationToken::new(),
149            "test-session".to_string(),
150            app_state.invitation.clone(),
151            app_state.clone(),
152            TrackConfig::default(),
153            None,
154            false,
155            None,
156            None,
157        ));
158
159        let mut tts_opt = crate::synthesis::SynthesisOption::default();
160        tts_opt.provider = Some(crate::synthesis::SynthesisType::MsEdge);
161        let mut option = crate::CallOption::default();
162        option.tts = Some(tts_opt);
163        {
164            let mut state = active_call.call_state.write().await;
165            state.option = Some(option);
166        }
167
168        let (tx, _rx) = mpsc::unbounded_channel();
169        let initial_ssrc = 111;
170        let handle = SynthesisHandle::new(tx, Some("play_1".to_string()), initial_ssrc);
171
172        {
173            let mut state = active_call.call_state.write().await;
174            state.tts_handle = Some(handle);
175            state.current_play_id = Some("play_1".to_string());
176        }
177
178        // Call do_tts with DIFFERENT play_id
179        active_call
180            .do_tts(
181                "new play".to_string(),
182                None,
183                Some("play_2".to_string()),
184                Some(true),
185                false,
186                true,
187                None,
188                None,
189                false,
190            )
191            .await?;
192
193        // Verify auto_hangup uses a NEW SSRC (because it should interrupt and start fresh)
194        {
195            let state = active_call.call_state.read().await;
196            let (h_ssrc, _) = state.auto_hangup.clone().unwrap();
197            assert_ne!(
198                h_ssrc, initial_ssrc,
199                "Should use a new SSRC for different play_id"
200            );
201        }
202
203        Ok(())
204    }
205}
206
207#[derive(Deserialize)]
208#[serde(rename_all = "camelCase")]
209pub struct CallParams {
210    pub id: Option<String>,
211    #[serde(rename = "dump")]
212    pub dump_events: Option<bool>,
213    #[serde(rename = "ping")]
214    pub ping_interval: Option<u32>,
215    pub server_side_track: Option<String>,
216}
217
218#[derive(Debug, Serialize, Deserialize, Clone, Default)]
219#[serde(rename_all = "camelCase")]
220pub enum ActiveCallType {
221    Webrtc,
222    B2bua,
223    WebSocket,
224    #[default]
225    Sip,
226}
227
228#[derive(Default)]
229pub struct ActiveCallState {
230    pub session_id: String,
231    pub start_time: DateTime<Utc>,
232    pub ring_time: Option<DateTime<Utc>>,
233    pub answer_time: Option<DateTime<Utc>>,
234    pub hangup_reason: Option<CallRecordHangupReason>,
235    pub last_status_code: u16,
236    pub option: Option<CallOption>,
237    pub answer: Option<String>,
238    pub ssrc: u32,
239    pub refer_callstate: Option<ActiveCallStateRef>,
240    pub extras: Option<HashMap<String, serde_json::Value>>,
241    pub is_refer: bool,
242
243    // Runtime state (migrated from ActiveCall to reduce multiple locks)
244    pub tts_handle: Option<SynthesisHandle>,
245    pub auto_hangup: Option<(u32, CallRecordHangupReason)>,
246    pub wait_input_timeout: Option<u32>,
247    pub moh: Option<String>,
248    pub current_play_id: Option<String>,
249    pub audio_receiver: Option<WebsocketBytesReceiver>,
250    pub ready_to_answer: Option<(String, Option<Box<dyn Track>>, ServerInviteDialog)>,
251}
252
253pub type ActiveCallRef = Arc<ActiveCall>;
254pub type ActiveCallStateRef = Arc<RwLock<ActiveCallState>>;
255
256pub struct ActiveCall {
257    pub call_state: ActiveCallStateRef,
258    pub cancel_token: CancellationToken,
259    pub call_type: ActiveCallType,
260    pub session_id: String,
261    pub media_stream: Arc<MediaStream>,
262    pub track_config: TrackConfig,
263    pub event_sender: EventSender,
264    pub app_state: AppState,
265    pub invitation: Invitation,
266    pub cmd_sender: CommandSender,
267    pub dump_events: bool,
268    pub server_side_track_id: TrackId,
269}
270
271pub struct ActiveCallGuard {
272    pub call: ActiveCallRef,
273    pub active_calls: usize,
274}
275
276impl ActiveCallGuard {
277    pub fn new(call: ActiveCallRef) -> Self {
278        let active_calls = {
279            call.app_state
280                .total_calls
281                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
282            let mut calls = call.app_state.active_calls.lock().unwrap();
283            calls.insert(call.session_id.clone(), call.clone());
284            calls.len()
285        };
286        Self { call, active_calls }
287    }
288}
289
290impl Drop for ActiveCallGuard {
291    fn drop(&mut self) {
292        self.call
293            .app_state
294            .active_calls
295            .lock()
296            .unwrap()
297            .remove(&self.call.session_id);
298    }
299}
300
301pub struct ActiveCallReceiver {
302    pub cmd_receiver: CommandReceiver,
303    pub dump_cmd_receiver: CommandReceiver,
304    pub dump_event_receiver: EventReceiver,
305}
306
307impl ActiveCall {
308    pub fn new(
309        call_type: ActiveCallType,
310        cancel_token: CancellationToken,
311        session_id: String,
312        invitation: Invitation,
313        app_state: AppState,
314        track_config: TrackConfig,
315        audio_receiver: Option<WebsocketBytesReceiver>,
316        dump_events: bool,
317        server_side_track_id: Option<TrackId>,
318        extras: Option<HashMap<String, serde_json::Value>>,
319    ) -> Self {
320        let event_sender = crate::event::create_event_sender();
321        let cmd_sender = tokio::sync::broadcast::Sender::<Command>::new(32);
322        let media_stream_builder = MediaStreamBuilder::new(event_sender.clone())
323            .with_id(session_id.clone())
324            .with_cancel_token(cancel_token.child_token());
325        let media_stream = Arc::new(media_stream_builder.build());
326        let call_state = Arc::new(RwLock::new(ActiveCallState {
327            session_id: session_id.clone(),
328            start_time: Utc::now(),
329            ssrc: rand::random::<u32>(),
330            extras,
331            audio_receiver,
332            ..Default::default()
333        }));
334        Self {
335            cancel_token,
336            call_type,
337            session_id,
338            call_state,
339            media_stream,
340            track_config,
341            event_sender,
342            app_state,
343            invitation,
344            cmd_sender,
345            dump_events,
346            server_side_track_id: server_side_track_id.unwrap_or("server-side-track".to_string()),
347        }
348    }
349
350    pub async fn enqueue_command(&self, command: Command) -> Result<()> {
351        self.cmd_sender
352            .send(command)
353            .map_err(|e| anyhow::anyhow!("Failed to send command: {}", e))?;
354        Ok(())
355    }
356
357    /// Create a new ActiveCallReceiver for this ActiveCall
358    /// `tokio::sync::broadcast` not cached messages, so need to early create receiver
359    /// before calling `serve()`
360    pub fn new_receiver(&self) -> ActiveCallReceiver {
361        ActiveCallReceiver {
362            cmd_receiver: self.cmd_sender.subscribe(),
363            dump_cmd_receiver: self.cmd_sender.subscribe(),
364            dump_event_receiver: self.event_sender.subscribe(),
365        }
366    }
367
368    pub async fn serve(&self, receiver: ActiveCallReceiver) -> Result<()> {
369        let ActiveCallReceiver {
370            mut cmd_receiver,
371            dump_cmd_receiver,
372            dump_event_receiver,
373        } = receiver;
374
375        let process_command_loop = async move {
376            while let Ok(command) = cmd_receiver.recv().await {
377                match self.dispatch(command).await {
378                    Ok(_) => (),
379                    Err(e) => {
380                        warn!(session_id = self.session_id, "{}", e);
381                        self.event_sender
382                            .send(SessionEvent::Error {
383                                track_id: self.session_id.clone(),
384                                timestamp: crate::media::get_timestamp(),
385                                sender: "command".to_string(),
386                                error: e.to_string(),
387                                code: None,
388                            })
389                            .ok();
390                    }
391                }
392            }
393        };
394        self.app_state
395            .total_calls
396            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
397
398        tokio::join!(
399            self.dump_loop(self.dump_events, dump_cmd_receiver, dump_event_receiver),
400            async {
401                select! {
402                    _ = process_command_loop => {
403                        info!(session_id = self.session_id, "command loop done");
404                    }
405                    _ = self.process() => {
406                        info!(session_id = self.session_id, "call serve done");
407                    }
408                    _ = self.cancel_token.cancelled() => {
409                        info!(session_id = self.session_id, "call cancelled - cleaning up resources");
410                    }
411                }
412                self.cancel_token.cancel();
413            }
414        );
415        Ok(())
416    }
417
418    async fn process(&self) -> Result<()> {
419        let mut event_receiver = self.event_sender.subscribe();
420
421        let input_timeout_expire = Arc::new(Mutex::new((0u64, 0u32)));
422        let input_timeout_expire_ref = input_timeout_expire.clone();
423        let event_sender = self.event_sender.clone();
424        let wait_input_timeout_loop = async {
425            loop {
426                let (start_time, expire) = { *input_timeout_expire.lock().await };
427                if expire > 0 && crate::media::get_timestamp() >= start_time + expire as u64 {
428                    info!(session_id = self.session_id, "wait input timeout reached");
429                    *input_timeout_expire.lock().await = (0, 0);
430                    event_sender
431                        .send(SessionEvent::Silence {
432                            track_id: self.server_side_track_id.clone(),
433                            timestamp: crate::media::get_timestamp(),
434                            start_time,
435                            duration: expire as u64,
436                            samples: None,
437                        })
438                        .ok();
439                }
440                sleep(Duration::from_millis(100)).await;
441            }
442        };
443        let server_side_track_id = self.server_side_track_id.clone();
444        let event_hook_loop = async move {
445            while let Ok(event) = event_receiver.recv().await {
446                match event {
447                    SessionEvent::Speaking { .. }
448                    | SessionEvent::Dtmf { .. }
449                    | SessionEvent::AsrDelta { .. }
450                    | SessionEvent::AsrFinal { .. }
451                    | SessionEvent::TrackStart { .. } => {
452                        *input_timeout_expire_ref.lock().await = (0, 0);
453                    }
454                    SessionEvent::TrackEnd {
455                        track_id,
456                        play_id,
457                        ssrc,
458                        ..
459                    } => {
460                        if track_id != server_side_track_id {
461                            continue;
462                        }
463
464                        let (moh_path, auto_hangup, wait_timeout_val) = {
465                            let mut state = self.call_state.write().await;
466                            if play_id != state.current_play_id {
467                                debug!(
468                                    session_id = self.session_id,
469                                    ?play_id,
470                                    current = ?state.current_play_id,
471                                    "ignoring interrupted track end"
472                                );
473                                continue;
474                            }
475                            state.current_play_id = None;
476                            (
477                                state.moh.clone(),
478                                state.auto_hangup.clone(),
479                                state.wait_input_timeout.take(),
480                            )
481                        };
482
483                        if let Some(path) = moh_path {
484                            info!(session_id = self.session_id, "looping moh: {}", path);
485                            let ssrc = rand::random::<u32>();
486                            let file_track = FileTrack::new(self.server_side_track_id.clone())
487                                .with_play_id(Some(path.clone()))
488                                .with_ssrc(ssrc)
489                                .with_path(path.clone())
490                                .with_cancel_token(self.cancel_token.child_token());
491                            self.update_track_wrapper(Box::new(file_track), Some(path))
492                                .await;
493                            continue;
494                        }
495
496                        if let Some((hangup_ssrc, hangup_reason)) = auto_hangup {
497                            if hangup_ssrc == ssrc {
498                                info!(
499                                    session_id = self.session_id,
500                                    ssrc, "auto hangup when track end track_id:{}", track_id
501                                );
502                                self.do_hangup(Some(hangup_reason), None).await.ok();
503                            }
504                        }
505
506                        if let Some(timeout) = wait_timeout_val {
507                            let expire = if timeout > 0 {
508                                (crate::media::get_timestamp(), timeout)
509                            } else {
510                                (0, 0)
511                            };
512                            *input_timeout_expire_ref.lock().await = expire;
513                        }
514                    }
515                    SessionEvent::Interrupt { receiver } => {
516                        let track_id =
517                            receiver.unwrap_or_else(|| self.server_side_track_id.clone());
518                        if track_id == self.server_side_track_id {
519                            debug!(
520                                session_id = self.session_id,
521                                "received interrupt event, stopping playback"
522                            );
523                            self.do_interrupt(true).await.ok();
524                        }
525                    }
526                    SessionEvent::Inactivity { track_id, .. } => {
527                        info!(
528                            session_id = self.session_id,
529                            track_id, "inactivity timeout reached, hanging up"
530                        );
531                        self.do_hangup(Some(CallRecordHangupReason::InactivityTimeout), None)
532                            .await
533                            .ok();
534                    }
535                    SessionEvent::Error { track_id, .. } => {
536                        if track_id != server_side_track_id {
537                            continue;
538                        }
539
540                        let moh_info = {
541                            let mut state = self.call_state.write().await;
542                            if let Some(path) = state.moh.clone() {
543                                let fallback = "./config/sounds/refer_moh.wav".to_string();
544                                let next_path = if path != fallback
545                                    && std::path::Path::new(&fallback).exists()
546                                {
547                                    info!(
548                                        session_id = self.session_id,
549                                        "moh error, switching to fallback: {}", fallback
550                                    );
551                                    state.moh = Some(fallback.clone());
552                                    fallback
553                                } else {
554                                    info!(
555                                        session_id = self.session_id,
556                                        "looping moh on error: {}", path
557                                    );
558                                    path
559                                };
560                                Some(next_path)
561                            } else {
562                                None
563                            }
564                        };
565
566                        if let Some(next_path) = moh_info {
567                            let ssrc = rand::random::<u32>();
568                            let file_track = FileTrack::new(self.server_side_track_id.clone())
569                                .with_play_id(Some(next_path.clone()))
570                                .with_ssrc(ssrc)
571                                .with_path(next_path.clone())
572                                .with_cancel_token(self.cancel_token.child_token());
573                            self.update_track_wrapper(Box::new(file_track), Some(next_path))
574                                .await;
575                            continue;
576                        }
577                    }
578                    _ => {}
579                }
580            }
581        };
582
583        select! {
584            _ = wait_input_timeout_loop=>{
585                info!(session_id = self.session_id, "wait input timeout loop done");
586            }
587            _ = self.media_stream.serve() => {
588                info!(session_id = self.session_id, "media stream loop done");
589            }
590            _ = event_hook_loop => {
591                info!(session_id = self.session_id, "event loop done");
592            }
593        }
594        Ok(())
595    }
596
597    async fn dispatch(&self, command: Command) -> Result<()> {
598        match command {
599            Command::Invite { option } => self.do_invite(option).await,
600            Command::Accept { option } => self.do_accept(option).await,
601            Command::Reject { reason, code } => {
602                self.do_reject(code.map(|c| (c as u16).into()), Some(reason))
603                    .await
604            }
605            Command::Ringing {
606                ringtone,
607                recorder,
608                early_media,
609            } => self.do_ringing(ringtone, recorder, early_media).await,
610            Command::Tts {
611                text,
612                speaker,
613                play_id,
614                auto_hangup,
615                streaming,
616                end_of_stream,
617                option,
618                wait_input_timeout,
619                base64,
620            } => {
621                self.do_tts(
622                    text,
623                    speaker,
624                    play_id,
625                    auto_hangup,
626                    streaming.unwrap_or_default(),
627                    end_of_stream.unwrap_or_default(),
628                    option,
629                    wait_input_timeout,
630                    base64.unwrap_or_default(),
631                )
632                .await
633            }
634            Command::Play {
635                url,
636                play_id,
637                auto_hangup,
638                wait_input_timeout,
639            } => {
640                self.do_play(url, play_id, auto_hangup, wait_input_timeout)
641                    .await
642            }
643            Command::Hangup { reason, initiator } => {
644                let reason = reason.map(|r| {
645                    r.parse::<CallRecordHangupReason>()
646                        .unwrap_or(CallRecordHangupReason::BySystem)
647                });
648                self.do_hangup(reason, initiator).await
649            }
650            Command::Refer {
651                caller,
652                callee,
653                options,
654            } => self.do_refer(caller, callee, options).await,
655            Command::Mute { track_id } => self.do_mute(track_id).await,
656            Command::Unmute { track_id } => self.do_unmute(track_id).await,
657            Command::Pause {} => self.do_pause().await,
658            Command::Resume {} => self.do_resume().await,
659            Command::Interrupt {
660                graceful: passage,
661                fade_out_ms: _,
662            } => self.do_interrupt(passage.unwrap_or_default()).await,
663            Command::History { speaker, text } => self.do_history(speaker, text).await,
664        }
665    }
666
667    fn build_record_option(&self, option: &CallOption) -> Option<RecorderOption> {
668        if let Some(recorder_option) = &option.recorder {
669            let mut recorder_file = recorder_option.recorder_file.clone();
670            if recorder_file.contains("{id}") {
671                recorder_file = recorder_file.replace("{id}", &self.session_id);
672            }
673
674            let recorder_file = if recorder_file.is_empty() {
675                self.app_state.get_recorder_file(&self.session_id)
676            } else {
677                let p = Path::new(&recorder_file);
678                p.is_absolute()
679                    .then(|| recorder_file.clone())
680                    .unwrap_or_else(|| self.app_state.get_recorder_file(&recorder_file))
681            };
682            info!(
683                session_id = self.session_id,
684                recorder_file, "created recording file"
685            );
686
687            let track_samplerate = self.track_config.samplerate;
688            let recorder_samplerate = if track_samplerate > 0 {
689                track_samplerate
690            } else {
691                recorder_option.samplerate
692            };
693            let recorder_ptime = if recorder_option.ptime == 0 {
694                200
695            } else {
696                recorder_option.ptime
697            };
698            let requested_format = recorder_option
699                .format
700                .unwrap_or(self.app_state.config.recorder_format());
701            let format = requested_format.effective();
702            if requested_format != format {
703                warn!(
704                    session_id = self.session_id,
705                    requested = requested_format.extension(),
706                    "Recorder format fallback to wav due to unsupported feature"
707                );
708            }
709            let mut recorder_config = RecorderOption {
710                recorder_file,
711                samplerate: recorder_samplerate,
712                ptime: recorder_ptime,
713                format: Some(format),
714            };
715            recorder_config.ensure_path_extension(format);
716            Some(recorder_config)
717        } else {
718            None
719        }
720    }
721
722    async fn invite_or_accept(&self, mut option: CallOption, sender: String) -> Result<CallOption> {
723        // Merge with existing configuration (e.g., from playbook)
724        {
725            let state = self.call_state.read().await;
726            option = state.merge_option(option);
727        }
728
729        option.check_default();
730        if let Some(opt) = self.build_record_option(&option) {
731            self.media_stream.update_recorder_option(opt).await;
732        }
733
734        if let Some(opt) = &option.media_pass {
735            let track_id = self.server_side_track_id.clone();
736            let cancel_token = self.cancel_token.child_token();
737            let ssrc = rand::random::<u32>();
738            let media_pass_track = MediaPassTrack::new(
739                self.session_id.clone(),
740                ssrc,
741                track_id,
742                cancel_token,
743                opt.clone(),
744            );
745            self.update_track_wrapper(Box::new(media_pass_track), None)
746                .await;
747        }
748
749        info!(
750            session_id = self.session_id,
751            call_type = ?self.call_type,
752            sender,
753            ?option,
754            "caller with option"
755        );
756
757        match self.setup_caller_track(&option).await {
758            Ok(_) => return Ok(option),
759            Err(e) => {
760                self.app_state
761                    .total_failed_calls
762                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
763                let error_event = crate::event::SessionEvent::Error {
764                    track_id: self.session_id.clone(),
765                    timestamp: crate::media::get_timestamp(),
766                    sender,
767                    error: e.to_string(),
768                    code: None,
769                };
770                self.event_sender.send(error_event).ok();
771                self.do_hangup(Some(CallRecordHangupReason::BySystem), None)
772                    .await
773                    .ok();
774                return Err(e);
775            }
776        }
777    }
778
779    async fn do_invite(&self, option: CallOption) -> Result<()> {
780        self.invite_or_accept(option, "invite".to_string())
781            .await
782            .map(|_| ())
783    }
784
785    async fn do_accept(&self, mut option: CallOption) -> Result<()> {
786        let has_pending = self
787            .invitation
788            .find_dialog_id_by_session_id(&self.session_id)
789            .is_some();
790        let ready_to_answer_val = {
791            let state = self.call_state.read().await;
792            state.ready_to_answer.is_none()
793        };
794
795        if ready_to_answer_val {
796            if !has_pending {
797                // emit reject event
798                warn!(session_id = self.session_id, "no pending call to accept");
799                let rejet_event = crate::event::SessionEvent::Reject {
800                    track_id: self.session_id.clone(),
801                    timestamp: crate::media::get_timestamp(),
802                    reason: "no pending call".to_string(),
803                    refer: None,
804                    code: Some(486),
805                };
806                self.event_sender.send(rejet_event).ok();
807                self.do_hangup(Some(CallRecordHangupReason::BySystem), None)
808                    .await
809                    .ok();
810                return Err(anyhow::anyhow!("no pending call to accept"));
811            }
812            option = self.invite_or_accept(option, "accept".to_string()).await?;
813        } else {
814            option.check_default();
815            self.call_state.write().await.option = Some(option.clone());
816        }
817        info!(session_id = self.session_id, ?option, "accepting call");
818        let ready = self.call_state.write().await.ready_to_answer.take();
819        if let Some((answer, track, dialog)) = ready {
820            info!(
821                session_id = self.session_id,
822                track_id = track.as_ref().map(|t| t.id()),
823                "ready to answer with track"
824            );
825
826            let headers = vec![rsip::Header::ContentType(
827                "application/sdp".to_string().into(),
828            )];
829
830            match dialog.accept(Some(headers), Some(answer.as_bytes().to_vec())) {
831                Ok(_) => {
832                    {
833                        let mut state = self.call_state.write().await;
834                        state.answer = Some(answer);
835                        state.answer_time = Some(Utc::now());
836                    }
837                    self.finish_caller_stack(&option, track).await?;
838                }
839                Err(e) => {
840                    warn!(session_id = self.session_id, "failed to accept call: {}", e);
841                    return Err(anyhow::anyhow!("failed to accept call"));
842                }
843            }
844        }
845        return Ok(());
846    }
847
848    async fn do_reject(
849        &self,
850        code: Option<rsip::StatusCode>,
851        reason: Option<String>,
852    ) -> Result<()> {
853        match self
854            .invitation
855            .find_dialog_id_by_session_id(&self.session_id)
856        {
857            Some(id) => {
858                info!(
859                    session_id = self.session_id,
860                    ?reason,
861                    ?code,
862                    "rejecting call"
863                );
864                self.invitation.hangup(id, code, reason).await
865            }
866            None => Ok(()),
867        }
868    }
869
870    async fn do_ringing(
871        &self,
872        ringtone: Option<String>,
873        recorder: Option<RecorderOption>,
874        early_media: Option<bool>,
875    ) -> Result<()> {
876        let ready_to_answer_val = self.call_state.read().await.ready_to_answer.is_none();
877        if ready_to_answer_val {
878            let option = CallOption {
879                recorder,
880                ..Default::default()
881            };
882            let _ = self.invite_or_accept(option, "ringing".to_string()).await?;
883        }
884
885        let state = self.call_state.read().await;
886        if let Some((answer, _, dialog)) = state.ready_to_answer.as_ref() {
887            let (headers, body) = if early_media.unwrap_or_default() || ringtone.is_some() {
888                let headers = vec![rsip::Header::ContentType(
889                    "application/sdp".to_string().into(),
890                )];
891                (Some(headers), Some(answer.as_bytes().to_vec()))
892            } else {
893                (None, None)
894            };
895
896            dialog.ringing(headers, body).ok();
897            info!(
898                session_id = self.session_id,
899                ringtone, early_media, "playing ringtone"
900            );
901            if let Some(ringtone_url) = ringtone {
902                drop(state);
903                self.do_play(ringtone_url, None, None, None).await.ok();
904            } else {
905                info!(session_id = self.session_id, "no ringtone to play");
906            }
907        }
908        Ok(())
909    }
910
911    async fn do_tts(
912        &self,
913        text: String,
914        speaker: Option<String>,
915        play_id: Option<String>,
916        auto_hangup: Option<bool>,
917        streaming: bool,
918        end_of_stream: bool,
919        option: Option<SynthesisOption>,
920        wait_input_timeout: Option<u32>,
921        base64: bool,
922    ) -> Result<()> {
923        let tts_option = {
924            let call_state = self.call_state.read().await;
925            match call_state.option.clone().unwrap_or_default().tts {
926                Some(opt) => opt.merge_with(option),
927                None => {
928                    if let Some(opt) = option {
929                        opt
930                    } else {
931                        return Err(anyhow::anyhow!("no tts option available"));
932                    }
933                }
934            }
935        };
936        let speaker = match speaker {
937            Some(s) => Some(s),
938            None => tts_option.speaker.clone(),
939        };
940
941        let mut play_command = SynthesisCommand {
942            text,
943            speaker,
944            play_id: play_id.clone(),
945            streaming,
946            end_of_stream,
947            option: tts_option,
948            base64,
949        };
950        info!(
951            session_id = self.session_id,
952            provider = ?play_command.option.provider,
953            text = %play_command.text.chars().take(10).collect::<String>(),
954            speaker = play_command.speaker.as_deref(),
955            auto_hangup = auto_hangup.unwrap_or_default(),
956            play_id = play_command.play_id.as_deref(),
957            streaming = play_command.streaming,
958            end_of_stream = play_command.end_of_stream,
959            wait_input_timeout = wait_input_timeout.unwrap_or_default(),
960            is_base64 = play_command.base64,
961            "new synthesis"
962        );
963
964        let ssrc = rand::random::<u32>();
965        let (should_interrupt, picked_ssrc) = {
966            let mut state = self.call_state.write().await;
967
968            let (target_ssrc, changed) = if let Some(handle) = &state.tts_handle {
969                if play_id.is_some() && state.current_play_id != play_id {
970                    (ssrc, true)
971                } else {
972                    (handle.ssrc, false)
973                }
974            } else {
975                (ssrc, false)
976            };
977
978            state.auto_hangup = match auto_hangup {
979                Some(true) => Some((target_ssrc, CallRecordHangupReason::BySystem)),
980                _ => state.auto_hangup.clone(),
981            };
982            state.wait_input_timeout = wait_input_timeout;
983
984            state.current_play_id = play_id.clone();
985            (changed, target_ssrc)
986        };
987
988        if should_interrupt {
989            let _ = self.do_interrupt(false).await;
990        }
991
992        let existing_handle = self.call_state.read().await.tts_handle.clone();
993        if let Some(tts_handle) = existing_handle {
994            match tts_handle.try_send(play_command) {
995                Ok(_) => return Ok(()),
996                Err(e) => {
997                    play_command = e.0;
998                }
999            }
1000        }
1001
1002        let (new_handle, tts_track) = StreamEngine::create_tts_track(
1003            self.app_state.stream_engine.clone(),
1004            self.cancel_token.child_token(),
1005            self.session_id.clone(),
1006            self.server_side_track_id.clone(),
1007            picked_ssrc,
1008            play_id.clone(),
1009            streaming,
1010            &play_command.option,
1011        )
1012        .await?;
1013
1014        new_handle.try_send(play_command)?;
1015        self.call_state.write().await.tts_handle = Some(new_handle);
1016        self.update_track_wrapper(tts_track, play_id).await;
1017        Ok(())
1018    }
1019
1020    async fn do_play(
1021        &self,
1022        url: String,
1023        play_id: Option<String>,
1024        auto_hangup: Option<bool>,
1025        wait_input_timeout: Option<u32>,
1026    ) -> Result<()> {
1027        let ssrc = rand::random::<u32>();
1028        info!(
1029            session_id = self.session_id,
1030            ssrc, url, play_id, auto_hangup, "play file track"
1031        );
1032
1033        let play_id = play_id.or(Some(url.clone()));
1034
1035        let file_track = FileTrack::new(self.server_side_track_id.clone())
1036            .with_play_id(play_id.clone())
1037            .with_ssrc(ssrc)
1038            .with_path(url)
1039            .with_cancel_token(self.cancel_token.child_token());
1040
1041        {
1042            let mut state = self.call_state.write().await;
1043            state.tts_handle = None;
1044            state.auto_hangup = match auto_hangup {
1045                Some(true) => Some((ssrc, CallRecordHangupReason::BySystem)),
1046                _ => None,
1047            };
1048            state.wait_input_timeout = wait_input_timeout;
1049        }
1050
1051        self.update_track_wrapper(Box::new(file_track), play_id)
1052            .await;
1053        Ok(())
1054    }
1055
1056    async fn do_history(&self, speaker: String, text: String) -> Result<()> {
1057        self.event_sender
1058            .send(SessionEvent::AddHistory {
1059                sender: Some(self.session_id.clone()),
1060                timestamp: crate::media::get_timestamp(),
1061                speaker,
1062                text,
1063            })
1064            .map(|_| ())
1065            .map_err(Into::into)
1066    }
1067
1068    async fn do_interrupt(&self, graceful: bool) -> Result<()> {
1069        {
1070            let mut state = self.call_state.write().await;
1071            state.tts_handle = None;
1072            state.moh = None;
1073        }
1074        self.media_stream
1075            .remove_track(&self.server_side_track_id, graceful)
1076            .await;
1077        Ok(())
1078    }
1079    async fn do_pause(&self) -> Result<()> {
1080        Ok(())
1081    }
1082    async fn do_resume(&self) -> Result<()> {
1083        Ok(())
1084    }
1085    async fn do_hangup(
1086        &self,
1087        reason: Option<CallRecordHangupReason>,
1088        initiator: Option<String>,
1089    ) -> Result<()> {
1090        info!(
1091            session_id = self.session_id,
1092            ?reason,
1093            ?initiator,
1094            "do_hangup"
1095        );
1096
1097        // Set hangup reason based on initiator and reason
1098        let hangup_reason = match initiator.as_deref() {
1099            Some("caller") => CallRecordHangupReason::ByCaller,
1100            Some("callee") => CallRecordHangupReason::ByCallee,
1101            Some("system") => CallRecordHangupReason::Autohangup,
1102            _ => reason.unwrap_or(CallRecordHangupReason::BySystem),
1103        };
1104
1105        self.media_stream
1106            .stop(Some(hangup_reason.to_string()), initiator);
1107
1108        self.call_state
1109            .write()
1110            .await
1111            .set_hangup_reason(hangup_reason);
1112        Ok(())
1113    }
1114
1115    async fn do_refer(
1116        &self,
1117        caller: String,
1118        callee: String,
1119        refer_option: Option<ReferOption>,
1120    ) -> Result<()> {
1121        self.do_interrupt(false).await.ok();
1122        let mut moh = refer_option.as_ref().and_then(|o| o.moh.clone());
1123        if let Some(ref path) = moh {
1124            if !path.starts_with("http") && !std::path::Path::new(path).exists() {
1125                let fallback = "./config/sounds/refer_moh.wav";
1126                if std::path::Path::new(fallback).exists() {
1127                    info!(
1128                        session_id = self.session_id,
1129                        "moh {} not found, using fallback {}", path, fallback
1130                    );
1131                    moh = Some(fallback.to_string());
1132                }
1133            }
1134        }
1135        let session_id = self.session_id.clone();
1136        let track_id = self.server_side_track_id.clone();
1137
1138        let recorder = {
1139            let cs = self.call_state.read().await;
1140            cs.option
1141                .as_ref()
1142                .map(|o| o.recorder.clone())
1143                .unwrap_or_default()
1144        };
1145
1146        let call_option = CallOption {
1147            caller: Some(caller),
1148            callee: Some(callee.clone()),
1149            sip: refer_option.as_ref().and_then(|o| o.sip.clone()),
1150            asr: refer_option.as_ref().and_then(|o| o.asr.clone()),
1151            denoise: refer_option.as_ref().and_then(|o| o.denoise.clone()),
1152            recorder,
1153            ..Default::default()
1154        };
1155
1156        let mut invite_option = call_option.build_invite_option()?;
1157        invite_option.call_id = Some(self.session_id.clone());
1158
1159        let headers = invite_option.headers.get_or_insert_with(|| Vec::new());
1160
1161        {
1162            let cs = self.call_state.read().await;
1163            if let Some(opt) = cs.option.as_ref() {
1164                if let Some(callee) = opt.callee.as_ref() {
1165                    headers.push(rsip::Header::Other(
1166                        "X-Referred-To".to_string(),
1167                        callee.clone(),
1168                    ));
1169                }
1170                if let Some(caller) = opt.caller.as_ref() {
1171                    headers.push(rsip::Header::Other(
1172                        "X-Referred-From".to_string(),
1173                        caller.clone(),
1174                    ));
1175                }
1176            }
1177        }
1178
1179        headers.push(rsip::Header::Other(
1180            "X-Referred-Id".to_string(),
1181            self.session_id.clone(),
1182        ));
1183
1184        let ssrc = rand::random::<u32>();
1185        let refer_call_state = Arc::new(RwLock::new(ActiveCallState {
1186            start_time: Utc::now(),
1187            ssrc,
1188            option: Some(call_option.clone()),
1189            is_refer: true,
1190            ..Default::default()
1191        }));
1192
1193        {
1194            let mut cs = self.call_state.write().await;
1195            cs.refer_callstate.replace(refer_call_state.clone());
1196        }
1197
1198        let auto_hangup_requested = refer_option
1199            .as_ref()
1200            .and_then(|o| o.auto_hangup)
1201            .unwrap_or(true);
1202
1203        if auto_hangup_requested {
1204            self.call_state.write().await.auto_hangup =
1205                Some((ssrc, CallRecordHangupReason::ByRefer));
1206        } else {
1207            self.call_state.write().await.auto_hangup = None;
1208        }
1209
1210        let timeout_secs = refer_option.as_ref().and_then(|o| o.timeout).unwrap_or(30);
1211
1212        info!(
1213            session_id = self.session_id,
1214            ssrc,
1215            auto_hangup = auto_hangup_requested,
1216            callee,
1217            timeout_secs,
1218            "do_refer"
1219        );
1220
1221        let r = tokio::time::timeout(
1222            Duration::from_secs(timeout_secs as u64),
1223            self.create_outgoing_sip_track(
1224                self.cancel_token.child_token(),
1225                refer_call_state.clone(),
1226                &track_id,
1227                invite_option,
1228                &call_option,
1229                moh,
1230                auto_hangup_requested,
1231            ),
1232        )
1233        .await;
1234
1235        {
1236            self.call_state.write().await.moh = None;
1237        }
1238
1239        let result = match r {
1240            Ok(res) => res,
1241            Err(_) => {
1242                warn!(
1243                    session_id = session_id,
1244                    "refer sip track creation timed out after {} seconds", timeout_secs
1245                );
1246                self.event_sender
1247                    .send(SessionEvent::Reject {
1248                        track_id,
1249                        timestamp: crate::media::get_timestamp(),
1250                        reason: "Timeout when refer".into(),
1251                        code: Some(408),
1252                        refer: Some(true),
1253                    })
1254                    .ok();
1255                return Err(anyhow::anyhow!("refer sip track creation timed out").into());
1256            }
1257        };
1258
1259        match result {
1260            Ok(answer) => {
1261                self.event_sender
1262                    .send(SessionEvent::Answer {
1263                        timestamp: crate::media::get_timestamp(),
1264                        track_id,
1265                        sdp: answer,
1266                        refer: Some(true),
1267                    })
1268                    .ok();
1269            }
1270            Err(e) => {
1271                warn!(
1272                    session_id = session_id,
1273                    "failed to create refer sip track: {}", e
1274                );
1275                match &e {
1276                    rsipstack::Error::DialogError(reason, _, code) => {
1277                        self.event_sender
1278                            .send(SessionEvent::Reject {
1279                                track_id,
1280                                timestamp: crate::media::get_timestamp(),
1281                                reason: reason.clone(),
1282                                code: Some(code.code() as u32),
1283                                refer: Some(true),
1284                            })
1285                            .ok();
1286                    }
1287                    _ => {}
1288                }
1289                return Err(e.into());
1290            }
1291        }
1292        Ok(())
1293    }
1294
1295    async fn do_mute(&self, track_id: Option<String>) -> Result<()> {
1296        self.media_stream.mute_track(track_id).await;
1297        Ok(())
1298    }
1299
1300    async fn do_unmute(&self, track_id: Option<String>) -> Result<()> {
1301        self.media_stream.unmute_track(track_id).await;
1302        Ok(())
1303    }
1304
1305    pub async fn cleanup(&self) -> Result<()> {
1306        self.call_state.write().await.tts_handle = None;
1307        self.media_stream.cleanup().await.ok();
1308        Ok(())
1309    }
1310
1311    pub fn get_callrecord(&self) -> Option<CallRecord> {
1312        self.call_state.try_read().ok().map(|call_state| {
1313            call_state.build_callrecord(
1314                self.app_state.clone(),
1315                self.session_id.clone(),
1316                self.call_type.clone(),
1317            )
1318        })
1319    }
1320
1321    async fn dump_to_file(
1322        &self,
1323        dump_file: &mut File,
1324        cmd_receiver: &mut CommandReceiver,
1325        event_receiver: &mut EventReceiver,
1326    ) {
1327        loop {
1328            select! {
1329                _ = self.cancel_token.cancelled() => {
1330                    break;
1331                }
1332                Ok(cmd) = cmd_receiver.recv() => {
1333                    CallRecordEvent::write(CallRecordEventType::Command, cmd, dump_file)
1334                        .await;
1335                }
1336                Ok(event) = event_receiver.recv() => {
1337                    if matches!(event, SessionEvent::Binary{..}) {
1338                        continue;
1339                    }
1340                    CallRecordEvent::write(CallRecordEventType::Event, event, dump_file)
1341                        .await;
1342                }
1343            };
1344        }
1345    }
1346
1347    async fn dump_loop(
1348        &self,
1349        dump_events: bool,
1350        mut dump_cmd_receiver: CommandReceiver,
1351        mut dump_event_receiver: EventReceiver,
1352    ) {
1353        if !dump_events {
1354            return;
1355        }
1356
1357        let file_name = self.app_state.get_dump_events_file(&self.session_id);
1358        let mut dump_file = match File::options()
1359            .create(true)
1360            .append(true)
1361            .open(&file_name)
1362            .await
1363        {
1364            Ok(file) => file,
1365            Err(e) => {
1366                warn!(
1367                    session_id = self.session_id,
1368                    file_name, "failed to open dump events file: {}", e
1369                );
1370                return;
1371            }
1372        };
1373        self.dump_to_file(
1374            &mut dump_file,
1375            &mut dump_cmd_receiver,
1376            &mut dump_event_receiver,
1377        )
1378        .await;
1379
1380        while let Ok(event) = dump_event_receiver.try_recv() {
1381            if matches!(event, SessionEvent::Binary { .. }) {
1382                continue;
1383            }
1384            CallRecordEvent::write(CallRecordEventType::Event, event, &mut dump_file).await;
1385        }
1386    }
1387
1388    pub async fn create_rtp_track(&self, track_id: TrackId, ssrc: u32) -> Result<RtcTrack> {
1389        let mut rtc_config = RtcTrackConfig::default();
1390        rtc_config.mode = rustrtc::TransportMode::Rtp;
1391
1392        if let Some(codecs) = &self.app_state.config.codecs {
1393            let mut codec_types = Vec::new();
1394            for c in codecs {
1395                match c.to_lowercase().as_str() {
1396                    "pcmu" => codec_types.push(CodecType::PCMU),
1397                    "pcma" => codec_types.push(CodecType::PCMA),
1398                    "g722" => codec_types.push(CodecType::G722),
1399                    "g729" => codec_types.push(CodecType::G729),
1400                    "opus" => codec_types.push(CodecType::Opus),
1401                    "dtmf" | "2833" | "telephone_event" => {
1402                        codec_types.push(CodecType::TelephoneEvent)
1403                    }
1404                    _ => {}
1405                }
1406            }
1407            if !codec_types.is_empty() {
1408                rtc_config.preferred_codec = Some(codec_types[0].clone());
1409                rtc_config.codecs = codec_types;
1410            }
1411        }
1412
1413        if rtc_config.preferred_codec.is_none() {
1414            rtc_config.preferred_codec = Some(self.track_config.codec.clone());
1415        }
1416
1417        rtc_config.rtp_port_range = self
1418            .app_state
1419            .config
1420            .rtp_start_port
1421            .zip(self.app_state.config.rtp_end_port);
1422
1423        if let Some(ref external_ip) = self.app_state.config.external_ip {
1424            rtc_config.external_ip = Some(external_ip.clone());
1425        }
1426
1427        rtc_config.enable_latching = self.app_state.config.enable_rtp_latching;
1428
1429        let mut track = RtcTrack::new(
1430            self.cancel_token.child_token(),
1431            track_id,
1432            self.track_config.clone(),
1433            rtc_config,
1434        )
1435        .with_ssrc(ssrc);
1436
1437        track.create().await?;
1438
1439        Ok(track)
1440    }
1441
1442    async fn setup_caller_track(&self, option: &CallOption) -> Result<()> {
1443        self.call_state.write().await.option = Some(option.clone());
1444        info!(
1445            session_id = self.session_id,
1446            call_type = ?self.call_type,
1447            "setup caller track"
1448        );
1449
1450        let track = match self.call_type {
1451            ActiveCallType::Webrtc => Some(self.create_webrtc_track().await?),
1452            ActiveCallType::WebSocket => {
1453                let audio_receiver = self.call_state.write().await.audio_receiver.take();
1454                if let Some(receiver) = audio_receiver {
1455                    Some(self.create_websocket_track(receiver).await?)
1456                } else {
1457                    None
1458                }
1459            }
1460            ActiveCallType::Sip => {
1461                if let Some(dialog_id) = self
1462                    .invitation
1463                    .find_dialog_id_by_session_id(&self.session_id)
1464                {
1465                    if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1466                        return self
1467                            .prepare_incoming_sip_track(
1468                                self.cancel_token.clone(),
1469                                self.call_state.clone(),
1470                                &self.session_id,
1471                                pending_dialog,
1472                            )
1473                            .await;
1474                    }
1475                }
1476
1477                let mut invite_option = option.build_invite_option()?;
1478                invite_option.call_id = Some(self.session_id.clone());
1479
1480                match self
1481                    .create_outgoing_sip_track(
1482                        self.cancel_token.clone(),
1483                        self.call_state.clone(),
1484                        &self.session_id,
1485                        invite_option,
1486                        &option,
1487                        None,
1488                        false,
1489                    )
1490                    .await
1491                {
1492                    Ok(answer) => {
1493                        self.event_sender
1494                            .send(SessionEvent::Answer {
1495                                timestamp: crate::media::get_timestamp(),
1496                                track_id: self.session_id.clone(),
1497                                sdp: answer,
1498                                refer: Some(false),
1499                            })
1500                            .ok();
1501                        return Ok(());
1502                    }
1503                    Err(e) => {
1504                        warn!(
1505                            session_id = self.session_id,
1506                            "failed to create sip track: {}", e
1507                        );
1508                        match &e {
1509                            rsipstack::Error::DialogError(reason, _, code) => {
1510                                self.event_sender
1511                                    .send(SessionEvent::Reject {
1512                                        track_id: self.session_id.clone(),
1513                                        timestamp: crate::media::get_timestamp(),
1514                                        reason: reason.clone(),
1515                                        code: Some(code.code() as u32),
1516                                        refer: Some(false),
1517                                    })
1518                                    .ok();
1519                            }
1520                            _ => {}
1521                        }
1522                        return Err(e.into());
1523                    }
1524                }
1525            }
1526            ActiveCallType::B2bua => {
1527                if let Some(dialog_id) = self
1528                    .invitation
1529                    .find_dialog_id_by_session_id(&self.session_id)
1530                {
1531                    if let Some(pending_dialog) = self.invitation.get_pending_call(&dialog_id) {
1532                        return self
1533                            .prepare_incoming_sip_track(
1534                                self.cancel_token.clone(),
1535                                self.call_state.clone(),
1536                                &self.session_id,
1537                                pending_dialog,
1538                            )
1539                            .await;
1540                    }
1541                }
1542
1543                warn!(
1544                    session_id = self.session_id,
1545                    "no pending dialog found for B2BUA call"
1546                );
1547                return Err(anyhow::anyhow!(
1548                    "no pending dialog found for session_id: {}",
1549                    self.session_id
1550                ));
1551            }
1552        };
1553        match track {
1554            Some(track) => {
1555                self.finish_caller_stack(&option, Some(track)).await?;
1556            }
1557            None => {
1558                warn!(session_id = self.session_id, "no track created for caller");
1559                return Err(anyhow::anyhow!("no track created for caller"));
1560            }
1561        }
1562        Ok(())
1563    }
1564
1565    async fn finish_caller_stack(
1566        &self,
1567        option: &CallOption,
1568        track: Option<Box<dyn Track>>,
1569    ) -> Result<()> {
1570        if let Some(track) = track {
1571            self.setup_track_with_stream(&option, track).await?;
1572        }
1573
1574        {
1575            let call_state = self.call_state.read().await;
1576            if let Some(ref answer) = call_state.answer {
1577                info!(
1578                    session_id = self.session_id,
1579                    "sending answer event: {}", answer,
1580                );
1581                self.event_sender
1582                    .send(SessionEvent::Answer {
1583                        timestamp: crate::media::get_timestamp(),
1584                        track_id: self.session_id.clone(),
1585                        sdp: answer.clone(),
1586                        refer: Some(false),
1587                    })
1588                    .ok();
1589            } else {
1590                warn!(
1591                    session_id = self.session_id,
1592                    "no answer in state to send event"
1593                );
1594            }
1595        }
1596        Ok(())
1597    }
1598
1599    pub async fn setup_track_with_stream(
1600        &self,
1601        option: &CallOption,
1602        mut track: Box<dyn Track>,
1603    ) -> Result<()> {
1604        let processors = match StreamEngine::create_processors(
1605            self.app_state.stream_engine.clone(),
1606            track.as_ref(),
1607            self.cancel_token.child_token(),
1608            self.event_sender.clone(),
1609            self.media_stream.packet_sender.clone(),
1610            option,
1611        )
1612        .await
1613        {
1614            Ok(processors) => processors,
1615            Err(e) => {
1616                warn!(
1617                    session_id = self.session_id,
1618                    "failed to prepare stream processors: {}", e
1619                );
1620                vec![]
1621            }
1622        };
1623
1624        // Add all processors from the hook
1625        for processor in processors {
1626            track.append_processor(processor);
1627        }
1628
1629        self.update_track_wrapper(track, None).await;
1630        Ok(())
1631    }
1632
1633    pub async fn update_track_wrapper(&self, mut track: Box<dyn Track>, play_id: Option<String>) {
1634        let ambiance_opt = {
1635            let state = self.call_state.read().await;
1636            let mut opt = state
1637                .option
1638                .as_ref()
1639                .and_then(|o| o.ambiance.clone())
1640                .unwrap_or_default();
1641
1642            if let Some(global) = &self.app_state.config.ambiance {
1643                opt.merge(global);
1644            }
1645            opt
1646        };
1647        if track.id() == &self.server_side_track_id && ambiance_opt.path.is_some() {
1648            match AmbianceProcessor::new(ambiance_opt).await {
1649                Ok(ambiance) => {
1650                    info!(session_id = self.session_id, "loaded ambiance processor");
1651                    track.append_processor(Box::new(ambiance));
1652                }
1653                Err(e) => {
1654                    tracing::error!("failed to load ambiance wav {}", e);
1655                }
1656            }
1657        }
1658        self.call_state.write().await.current_play_id = play_id.clone();
1659        self.media_stream.update_track(track, play_id).await;
1660    }
1661
1662    pub async fn create_websocket_track(
1663        &self,
1664        audio_receiver: WebsocketBytesReceiver,
1665    ) -> Result<Box<dyn Track>> {
1666        let (ssrc, codec) = {
1667            let call_state = self.call_state.read().await;
1668            (
1669                call_state.ssrc,
1670                call_state
1671                    .option
1672                    .as_ref()
1673                    .map(|o| o.codec.clone())
1674                    .unwrap_or_default(),
1675            )
1676        };
1677
1678        let ws_track = WebsocketTrack::new(
1679            self.cancel_token.child_token(),
1680            self.session_id.clone(),
1681            self.track_config.clone(),
1682            self.event_sender.clone(),
1683            audio_receiver,
1684            codec,
1685            ssrc,
1686        );
1687
1688        {
1689            let mut call_state = self.call_state.write().await;
1690            call_state.answer_time = Some(Utc::now());
1691            call_state.answer = Some("".to_string());
1692            call_state.last_status_code = 200;
1693        }
1694
1695        Ok(Box::new(ws_track))
1696    }
1697
1698    pub(super) async fn create_webrtc_track(&self) -> Result<Box<dyn Track>> {
1699        let (ssrc, option) = {
1700            let call_state = self.call_state.read().await;
1701            (
1702                call_state.ssrc,
1703                call_state.option.clone().unwrap_or_default(),
1704            )
1705        };
1706
1707        let mut rtc_config = RtcTrackConfig::default();
1708        rtc_config.mode = rustrtc::TransportMode::WebRtc; // WebRTC
1709        rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
1710
1711        if let Some(codecs) = &self.app_state.config.codecs {
1712            let mut codec_types = Vec::new();
1713            for c in codecs {
1714                match c.to_lowercase().as_str() {
1715                    "pcmu" => codec_types.push(CodecType::PCMU),
1716                    "pcma" => codec_types.push(CodecType::PCMA),
1717                    "g722" => codec_types.push(CodecType::G722),
1718                    "g729" => codec_types.push(CodecType::G729),
1719                    "opus" => codec_types.push(CodecType::Opus),
1720                    "dtmf" | "2833" | "telephone_event" => {
1721                        codec_types.push(CodecType::TelephoneEvent)
1722                    }
1723                    _ => {}
1724                }
1725            }
1726            if !codec_types.is_empty() {
1727                rtc_config.preferred_codec = Some(codec_types[0].clone());
1728                rtc_config.codecs = codec_types;
1729            }
1730        }
1731
1732        if let Some(ref external_ip) = self.app_state.config.external_ip {
1733            rtc_config.external_ip = Some(external_ip.clone());
1734        }
1735
1736        let mut webrtc_track = RtcTrack::new(
1737            self.cancel_token.child_token(),
1738            self.session_id.clone(),
1739            self.track_config.clone(),
1740            rtc_config,
1741        )
1742        .with_ssrc(ssrc);
1743
1744        let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
1745        let offer = match option.enable_ipv6 {
1746            Some(false) | None => {
1747                strip_ipv6_candidates(option.offer.as_ref().unwrap_or(&"".to_string()))
1748            }
1749            _ => option.offer.clone().unwrap_or("".to_string()),
1750        };
1751        let answer: Option<String>;
1752        match webrtc_track.handshake(offer, timeout).await {
1753            Ok(answer_sdp) => {
1754                answer = match option.enable_ipv6 {
1755                    Some(false) | None => Some(strip_ipv6_candidates(&answer_sdp)),
1756                    Some(true) => Some(answer_sdp.to_string()),
1757                };
1758            }
1759            Err(e) => {
1760                warn!(session_id = self.session_id, "failed to setup track: {}", e);
1761                return Err(anyhow::anyhow!("Failed to setup track: {}", e));
1762            }
1763        }
1764
1765        {
1766            let mut call_state = self.call_state.write().await;
1767            call_state.answer_time = Some(Utc::now());
1768            call_state.answer = answer;
1769            call_state.last_status_code = 200;
1770        }
1771        Ok(Box::new(webrtc_track))
1772    }
1773
1774    async fn create_outgoing_sip_track(
1775        &self,
1776        cancel_token: CancellationToken,
1777        call_state_ref: ActiveCallStateRef,
1778        track_id: &String,
1779        mut invite_option: InviteOption,
1780        call_option: &CallOption,
1781        moh: Option<String>,
1782        auto_hangup: bool,
1783    ) -> Result<String, rsipstack::Error> {
1784        let ssrc = call_state_ref.read().await.ssrc;
1785        let rtp_track = self
1786            .create_rtp_track(track_id.clone(), ssrc)
1787            .await
1788            .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1789
1790        let offer = Some(
1791            rtp_track
1792                .local_description()
1793                .await
1794                .map_err(|e| rsipstack::Error::Error(e.to_string()))?,
1795        );
1796
1797        {
1798            let mut cs = call_state_ref.write().await;
1799            if let Some(o) = cs.option.as_mut() {
1800                o.offer = offer.clone();
1801            }
1802            cs.start_time = Utc::now();
1803        };
1804
1805        invite_option.offer = offer.clone().map(|s| s.into());
1806
1807        // Set contact to local SIP endpoint address if not already set explicitly
1808        // Check if contact is still default (no scheme set) or if host is localhost-like
1809        let needs_contact = invite_option.contact.scheme.is_none()
1810            || invite_option
1811                .contact
1812                .host_with_port
1813                .to_string()
1814                .starts_with("127.0.0.1");
1815
1816        if needs_contact {
1817            if let Some(addr) = self.invitation.dialog_layer.endpoint.get_addrs().first() {
1818                invite_option.contact = rsip::Uri {
1819                    scheme: Some(rsip::Scheme::Sip),
1820                    auth: None,
1821                    host_with_port: addr.addr.clone(),
1822                    params: vec![],
1823                    headers: vec![],
1824                };
1825            }
1826        }
1827
1828        let mut rtp_track_to_setup = Some(Box::new(rtp_track) as Box<dyn Track>);
1829
1830        if let Some(moh) = moh {
1831            let ssrc_and_moh = {
1832                let mut state = call_state_ref.write().await;
1833                state.moh = Some(moh.clone());
1834                if state.current_play_id.is_none() {
1835                    let ssrc = rand::random::<u32>();
1836                    Some((ssrc, moh.clone()))
1837                } else {
1838                    info!(
1839                        session_id = self.session_id,
1840                        "Something is playing, MOH will start after it ends"
1841                    );
1842                    None
1843                }
1844            };
1845
1846            if let Some((ssrc, moh_path)) = ssrc_and_moh {
1847                let file_track = FileTrack::new(self.server_side_track_id.clone())
1848                    .with_play_id(Some(moh_path.clone()))
1849                    .with_ssrc(ssrc)
1850                    .with_path(moh_path.clone())
1851                    .with_cancel_token(self.cancel_token.child_token());
1852                self.update_track_wrapper(Box::new(file_track), Some(moh_path))
1853                    .await;
1854            }
1855        } else {
1856            let track = rtp_track_to_setup.take().unwrap();
1857            self.setup_track_with_stream(&call_option, track)
1858                .await
1859                .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1860        }
1861
1862        info!(
1863            session_id = self.session_id,
1864            track_id,
1865            contact = %invite_option.contact,
1866            "invite {} -> {} offer: \n{}",
1867            invite_option.caller,
1868            invite_option.callee,
1869            offer.as_ref().map(|s| s.as_str()).unwrap_or("<NO OFFER>")
1870        );
1871
1872        let (dlg_state_sender, dlg_state_receiver) =
1873            self.invitation.dialog_layer.new_dialog_state_channel();
1874
1875        let states = InviteDialogStates {
1876            is_client: true,
1877            session_id: self.session_id.clone(),
1878            track_id: track_id.clone(),
1879            event_sender: self.event_sender.clone(),
1880            media_stream: self.media_stream.clone(),
1881            call_state: call_state_ref.clone(),
1882            cancel_token,
1883            terminated_reason: None,
1884            has_early_media: false,
1885        };
1886
1887        let mut client_dialog_handler =
1888            DialogStateReceiverGuard::new(self.invitation.dialog_layer.clone(), dlg_state_receiver);
1889
1890        crate::spawn(async move {
1891            client_dialog_handler.process_dialog(states).await;
1892        });
1893
1894        let (dialog_id, answer) = self
1895            .invitation
1896            .invite(invite_option, dlg_state_sender)
1897            .await?;
1898
1899        self.call_state.write().await.moh = None;
1900
1901        if let Some(track) = rtp_track_to_setup {
1902            self.setup_track_with_stream(&call_option, track)
1903                .await
1904                .map_err(|e| rsipstack::Error::Error(e.to_string()))?;
1905        }
1906
1907        let answer = match answer {
1908            Some(answer) => String::from_utf8_lossy(&answer).to_string(),
1909            None => {
1910                warn!(session_id = self.session_id, "no answer received");
1911                return Err(rsipstack::Error::DialogError(
1912                    "No answer received".to_string(),
1913                    dialog_id,
1914                    rsip::StatusCode::NotAcceptableHere,
1915                ));
1916            }
1917        };
1918
1919        {
1920            let mut cs = call_state_ref.write().await;
1921            if cs.answer.is_none() {
1922                cs.answer = Some(answer.clone());
1923            }
1924            if auto_hangup {
1925                cs.auto_hangup = Some((ssrc, CallRecordHangupReason::ByRefer));
1926            }
1927        }
1928
1929        self.media_stream
1930            .update_remote_description(&track_id, &answer)
1931            .await
1932            .ok();
1933
1934        Ok(answer)
1935    }
1936
1937    /// Detect if SDP is WebRTC format
1938    pub fn is_webrtc_sdp(sdp: &str) -> bool {
1939        (sdp.contains("a=ice-ufrag:") || sdp.contains("a=ice-pwd:"))
1940            && sdp.contains("a=fingerprint:")
1941    }
1942
1943    pub async fn setup_answer_track(
1944        &self,
1945        ssrc: u32,
1946        option: &CallOption,
1947        offer: String,
1948    ) -> Result<(String, Box<dyn Track>)> {
1949        let offer = match option.enable_ipv6 {
1950            Some(false) | None => strip_ipv6_candidates(&offer),
1951            _ => offer.clone(),
1952        };
1953
1954        let timeout = option.handshake_timeout.map(|t| Duration::from_secs(t));
1955
1956        let mut media_track = if Self::is_webrtc_sdp(&offer) {
1957            let mut rtc_config = RtcTrackConfig::default();
1958            rtc_config.mode = rustrtc::TransportMode::WebRtc;
1959            rtc_config.ice_servers = self.app_state.config.ice_servers.clone();
1960            if let Some(ref external_ip) = self.app_state.config.external_ip {
1961                rtc_config.external_ip = Some(external_ip.clone());
1962            }
1963            rtc_config.enable_latching = self.app_state.config.enable_rtp_latching;
1964
1965            let webrtc_track = RtcTrack::new(
1966                self.cancel_token.child_token(),
1967                self.session_id.clone(),
1968                self.track_config.clone(),
1969                rtc_config,
1970            )
1971            .with_ssrc(ssrc);
1972
1973            Box::new(webrtc_track) as Box<dyn Track>
1974        } else {
1975            let rtp_track = self.create_rtp_track(self.session_id.clone(), ssrc).await?;
1976            Box::new(rtp_track) as Box<dyn Track>
1977        };
1978
1979        let answer = match media_track.handshake(offer.clone(), timeout).await {
1980            Ok(answer) => answer,
1981            Err(e) => {
1982                return Err(anyhow::anyhow!("handshake failed: {e}"));
1983            }
1984        };
1985
1986        return Ok((answer, media_track));
1987    }
1988
1989    pub async fn prepare_incoming_sip_track(
1990        &self,
1991        cancel_token: CancellationToken,
1992        call_state_ref: ActiveCallStateRef,
1993        track_id: &String,
1994        pending_dialog: PendingDialog,
1995    ) -> Result<()> {
1996        let state_receiver = pending_dialog.state_receiver;
1997        //let pending_token_clone = pending_dialog.token;
1998
1999        let states = InviteDialogStates {
2000            is_client: false,
2001            session_id: self.session_id.clone(),
2002            track_id: track_id.clone(),
2003            event_sender: self.event_sender.clone(),
2004            media_stream: self.media_stream.clone(),
2005            call_state: self.call_state.clone(),
2006            cancel_token,
2007            terminated_reason: None,
2008            has_early_media: false,
2009        };
2010
2011        let initial_request = pending_dialog.dialog.initial_request();
2012        let offer = String::from_utf8_lossy(&initial_request.body).to_string();
2013
2014        let (ssrc, option) = {
2015            let call_state = call_state_ref.read().await;
2016            (
2017                call_state.ssrc,
2018                call_state.option.clone().unwrap_or_default(),
2019            )
2020        };
2021
2022        match self.setup_answer_track(ssrc, &option, offer).await {
2023            Ok((offer, track)) => {
2024                self.setup_track_with_stream(&option, track).await?;
2025                {
2026                    let mut state = self.call_state.write().await;
2027                    state.ready_to_answer = Some((offer, None, pending_dialog.dialog));
2028                }
2029            }
2030            Err(e) => {
2031                return Err(anyhow::anyhow!("error creating track: {}", e));
2032            }
2033        }
2034
2035        let mut client_dialog_handler =
2036            DialogStateReceiverGuard::new(self.invitation.dialog_layer.clone(), state_receiver);
2037
2038        crate::spawn(async move {
2039            client_dialog_handler.process_dialog(states).await;
2040        });
2041        Ok(())
2042    }
2043}
2044
2045impl Drop for ActiveCall {
2046    fn drop(&mut self) {
2047        info!(session_id = self.session_id, "dropping active call");
2048        if let Some(sender) = self.app_state.callrecord_sender.as_ref() {
2049            if let Some(record) = self.get_callrecord() {
2050                if let Err(e) = sender.send(record) {
2051                    warn!(
2052                        session_id = self.session_id,
2053                        "failed to send call record: {}", e
2054                    );
2055                }
2056            }
2057        }
2058    }
2059}
2060
2061impl ActiveCallState {
2062    pub fn merge_option(&self, mut option: CallOption) -> CallOption {
2063        if let Some(existing) = &self.option {
2064            if option.asr.is_none() {
2065                option.asr = existing.asr.clone();
2066            }
2067            if option.tts.is_none() {
2068                option.tts = existing.tts.clone();
2069            }
2070            if option.vad.is_none() {
2071                option.vad = existing.vad.clone();
2072            }
2073            if option.denoise.is_none() {
2074                option.denoise = existing.denoise;
2075            }
2076            if option.recorder.is_none() {
2077                option.recorder = existing.recorder.clone();
2078            }
2079            if option.eou.is_none() {
2080                option.eou = existing.eou.clone();
2081            }
2082            if option.extra.is_none() {
2083                option.extra = existing.extra.clone();
2084            }
2085            if option.ambiance.is_none() {
2086                option.ambiance = existing.ambiance.clone();
2087            }
2088        }
2089        option
2090    }
2091
2092    pub fn set_hangup_reason(&mut self, reason: CallRecordHangupReason) {
2093        if self.hangup_reason.is_none() {
2094            self.hangup_reason = Some(reason);
2095        }
2096    }
2097
2098    pub fn build_hangup_event(
2099        &self,
2100        track_id: TrackId,
2101        initiator: Option<String>,
2102    ) -> crate::event::SessionEvent {
2103        let from = self.option.as_ref().and_then(|o| o.caller.as_ref());
2104        let to = self.option.as_ref().and_then(|o| o.callee.as_ref());
2105        let extra = self.extras.clone();
2106
2107        crate::event::SessionEvent::Hangup {
2108            track_id,
2109            timestamp: crate::media::get_timestamp(),
2110            reason: Some(format!("{:?}", self.hangup_reason)),
2111            initiator,
2112            start_time: self.start_time.to_rfc3339(),
2113            answer_time: self.answer_time.map(|t| t.to_rfc3339()),
2114            ringing_time: self.ring_time.map(|t| t.to_rfc3339()),
2115            hangup_time: Utc::now().to_rfc3339(),
2116            extra,
2117            from: from.map(|f| f.into()),
2118            to: to.map(|f| f.into()),
2119            refer: Some(self.is_refer),
2120        }
2121    }
2122
2123    pub fn build_callrecord(
2124        &self,
2125        app_state: AppState,
2126        session_id: String,
2127        call_type: ActiveCallType,
2128    ) -> CallRecord {
2129        let option = self.option.clone().unwrap_or_default();
2130        let recorder = if option.recorder.is_some() {
2131            let recorder_file = app_state.get_recorder_file(&session_id);
2132            if std::path::Path::new(&recorder_file).exists() {
2133                let file_size = std::fs::metadata(&recorder_file)
2134                    .map(|m| m.len())
2135                    .unwrap_or(0);
2136                vec![crate::callrecord::CallRecordMedia {
2137                    track_id: session_id.clone(),
2138                    path: recorder_file,
2139                    size: file_size,
2140                    extra: None,
2141                }]
2142            } else {
2143                vec![]
2144            }
2145        } else {
2146            vec![]
2147        };
2148
2149        let dump_event_file = app_state.get_dump_events_file(&session_id);
2150        let dump_event_file = if std::path::Path::new(&dump_event_file).exists() {
2151            Some(dump_event_file)
2152        } else {
2153            None
2154        };
2155
2156        let refer_callrecord = self.refer_callstate.as_ref().and_then(|rc| {
2157            if let Ok(rc) = rc.try_read() {
2158                Some(Box::new(rc.build_callrecord(
2159                    app_state.clone(),
2160                    rc.session_id.clone(),
2161                    ActiveCallType::B2bua,
2162                )))
2163            } else {
2164                None
2165            }
2166        });
2167
2168        let caller = option.caller.clone().unwrap_or_default();
2169        let callee = option.callee.clone().unwrap_or_default();
2170
2171        CallRecord {
2172            option: Some(option),
2173            call_id: session_id,
2174            call_type,
2175            start_time: self.start_time,
2176            ring_time: self.ring_time.clone(),
2177            answer_time: self.answer_time.clone(),
2178            end_time: Utc::now(),
2179            caller,
2180            callee,
2181            hangup_reason: self.hangup_reason.clone(),
2182            hangup_messages: Vec::new(),
2183            status_code: self.last_status_code,
2184            extras: self.extras.clone(),
2185            dump_event_file,
2186            recorder,
2187            refer_callrecord,
2188        }
2189    }
2190}