Skip to main content

active_call/call/
sip.rs

1use crate::call::active_call::ActiveCallStateRef;
2use crate::callrecord::CallRecordHangupReason;
3use crate::event::EventSender;
4use crate::media::TrackId;
5use crate::media::stream::MediaStream;
6use crate::useragent::invitation::PendingDialog;
7use anyhow::Result;
8use chrono::Utc;
9use rsipstack::dialog::DialogId;
10use rsipstack::dialog::dialog::{
11    Dialog, DialogState, DialogStateReceiver, DialogStateSender, TerminatedReason,
12};
13use rsipstack::dialog::dialog_layer::DialogLayer;
14use rsipstack::dialog::invitation::InviteOption;
15use rsipstack::rsip_ext::RsipResponseExt;
16use std::collections::HashMap;
17use std::sync::Arc;
18use tokio_util::sync::CancellationToken;
19use tracing::{info, warn};
20
21pub struct DialogStateReceiverGuard {
22    pub(super) dialog_layer: Arc<DialogLayer>,
23    pub(super) receiver: DialogStateReceiver,
24    pub(super) dialog_id: Option<DialogId>,
25}
26
27impl DialogStateReceiverGuard {
28    pub fn new(dialog_layer: Arc<DialogLayer>, receiver: DialogStateReceiver) -> Self {
29        Self {
30            dialog_layer,
31            receiver,
32            dialog_id: None,
33        }
34    }
35    pub async fn recv(&mut self) -> Option<DialogState> {
36        let state = self.receiver.recv().await;
37        if let Some(ref s) = state {
38            self.dialog_id = Some(s.id().clone());
39        }
40        state
41    }
42
43    fn take_dialog(&mut self) -> Option<Dialog> {
44        let id = match self.dialog_id.take() {
45            Some(id) => id,
46            None => return None,
47        };
48
49        match self.dialog_layer.get_dialog(&id) {
50            Some(dialog) => {
51                info!(%id, "dialog removed on  drop");
52                self.dialog_layer.remove_dialog(&id);
53                return Some(dialog);
54            }
55            _ => {}
56        }
57        None
58    }
59
60    pub async fn drop_async(&mut self) {
61        if let Some(dialog) = self.take_dialog() {
62            if let Err(e) = dialog.hangup().await {
63                warn!(id=%dialog.id(), "error hanging up dialog on drop: {}", e);
64            }
65        }
66    }
67}
68
69impl Drop for DialogStateReceiverGuard {
70    fn drop(&mut self) {
71        if let Some(dialog) = self.take_dialog() {
72            crate::spawn(async move {
73                if let Err(e) = dialog.hangup().await {
74                    warn!(id=%dialog.id(), "error hanging up dialog on drop: {}", e);
75                }
76            });
77        }
78    }
79}
80
81pub(super) struct InviteDialogStates {
82    pub is_client: bool,
83    pub session_id: String,
84    pub track_id: TrackId,
85    pub cancel_token: CancellationToken,
86    pub event_sender: EventSender,
87    pub call_state: ActiveCallStateRef,
88    pub media_stream: Arc<MediaStream>,
89    pub terminated_reason: Option<TerminatedReason>,
90    pub has_early_media: bool,
91}
92
93impl InviteDialogStates {
94    pub(super) fn on_terminated(&mut self) {
95        let mut call_state_ref = match self.call_state.try_write() {
96            Ok(cs) => cs,
97            Err(_) => {
98                return;
99            }
100        };
101        let reason = &self.terminated_reason;
102        call_state_ref.last_status_code = match reason {
103            Some(TerminatedReason::UacCancel) => 487,
104            Some(TerminatedReason::UacBye) => 200,
105            Some(TerminatedReason::UacBusy) => 486,
106            Some(TerminatedReason::UasBye) => 200,
107            Some(TerminatedReason::UasBusy) => 486,
108            Some(TerminatedReason::UasDecline) => 603,
109            Some(TerminatedReason::UacOther(code)) => code.code(),
110            Some(TerminatedReason::UasOther(code)) => code.code(),
111            _ => 500, // Default to internal server error
112        };
113
114        if call_state_ref.hangup_reason.is_none() {
115            call_state_ref.hangup_reason.replace(match reason {
116                Some(TerminatedReason::UacCancel) => CallRecordHangupReason::Canceled,
117                Some(TerminatedReason::UacBye) | Some(TerminatedReason::UacBusy) => {
118                    CallRecordHangupReason::ByCaller
119                }
120                Some(TerminatedReason::UasBye) | Some(TerminatedReason::UasBusy) => {
121                    CallRecordHangupReason::ByCallee
122                }
123                Some(TerminatedReason::UasDecline) => CallRecordHangupReason::ByCallee,
124                Some(TerminatedReason::UacOther(_)) => CallRecordHangupReason::ByCaller,
125                Some(TerminatedReason::UasOther(_)) => CallRecordHangupReason::ByCallee,
126                _ => CallRecordHangupReason::BySystem,
127            });
128        };
129        let initiator = match reason {
130            Some(TerminatedReason::UacCancel) => "caller".to_string(),
131            Some(TerminatedReason::UacBye) | Some(TerminatedReason::UacBusy) => {
132                "caller".to_string()
133            }
134            Some(TerminatedReason::UasBye)
135            | Some(TerminatedReason::UasBusy)
136            | Some(TerminatedReason::UasDecline) => "callee".to_string(),
137            _ => "system".to_string(),
138        };
139        self.event_sender
140            .send(crate::event::SessionEvent::TrackEnd {
141                track_id: self.track_id.clone(),
142                timestamp: crate::media::get_timestamp(),
143                duration: call_state_ref
144                    .answer_time
145                    .map(|t| (Utc::now() - t).num_milliseconds())
146                    .unwrap_or_default() as u64,
147                ssrc: call_state_ref.ssrc,
148                play_id: None,
149            })
150            .ok();
151        let hangup_event =
152            call_state_ref.build_hangup_event(self.track_id.clone(), Some(initiator));
153        self.event_sender.send(hangup_event).ok();
154    }
155}
156
157impl Drop for InviteDialogStates {
158    fn drop(&mut self) {
159        self.on_terminated();
160        self.cancel_token.cancel();
161    }
162}
163
164impl DialogStateReceiverGuard {
165    pub(self) async fn dialog_event_loop(&mut self, states: &mut InviteDialogStates) -> Result<()> {
166        while let Some(event) = self.recv().await {
167            match event {
168                DialogState::Calling(dialog_id) => {
169                    info!(session_id=states.session_id, %dialog_id, "dialog calling");
170                    states.call_state.write().await.session_id = dialog_id.to_string();
171                }
172                DialogState::Trying(_) => {}
173                DialogState::Early(dialog_id, resp) => {
174                    let code = resp.status_code.code();
175                    let body = resp.body();
176                    let answer = String::from_utf8_lossy(body);
177                    let has_sdp = !answer.is_empty();
178                    info!(session_id=states.session_id, %dialog_id, has_sdp=%has_sdp, "dialog early ({}): \n{}", code, answer);
179
180                    {
181                        let mut cs = states.call_state.write().await;
182                        if cs.ring_time.is_none() {
183                            cs.ring_time.replace(Utc::now());
184                        }
185                        cs.last_status_code = code;
186                    }
187
188                    if !states.is_client {
189                        continue;
190                    }
191
192                    let refer = states.call_state.read().await.is_refer;
193
194                    states
195                        .event_sender
196                        .send(crate::event::SessionEvent::Ringing {
197                            track_id: states.track_id.clone(),
198                            timestamp: crate::media::get_timestamp(),
199                            early_media: has_sdp,
200                            refer: Some(refer),
201                        })?;
202
203                    if has_sdp {
204                        states.has_early_media = true;
205                        states
206                            .media_stream
207                            .update_remote_description(&states.track_id, &answer.to_string())
208                            .await?;
209                    }
210                }
211                DialogState::Confirmed(dialog_id, msg) => {
212                    info!(session_id=states.session_id, %dialog_id, has_early_media=%states.has_early_media, "dialog confirmed");
213                    {
214                        let mut cs = states.call_state.write().await;
215                        cs.session_id = dialog_id.to_string();
216                        cs.answer_time.replace(Utc::now());
217                        cs.last_status_code = 200;
218                    }
219                    if states.is_client {
220                        let answer = String::from_utf8_lossy(msg.body());
221                        let answer = answer.trim();
222                        if !answer.is_empty() {
223                            if states.has_early_media {
224                                info!(
225                                    session_id = states.session_id,
226                                    "updating remote description with final answer after early media (force=true)"
227                                );
228                                // Force update when transitioning from early media (183) to confirmed (200 OK)
229                                // This ensures media parameters are properly updated even if SDP appears similar
230                                if let Err(e) = states
231                                    .media_stream
232                                    .update_remote_description_force(
233                                        &states.track_id,
234                                        &answer.to_string(),
235                                    )
236                                    .await
237                                {
238                                    tracing::warn!(
239                                        session_id = states.session_id,
240                                        "failed to force update remote description on confirmed: {}",
241                                        e
242                                    );
243                                }
244                            } else {
245                                if let Err(e) = states
246                                    .media_stream
247                                    .update_remote_description(
248                                        &states.track_id,
249                                        &answer.to_string(),
250                                    )
251                                    .await
252                                {
253                                    tracing::warn!(
254                                        session_id = states.session_id,
255                                        "failed to update remote description on confirmed: {}",
256                                        e
257                                    );
258                                }
259                            }
260                        }
261                    }
262                }
263                DialogState::Info(dialog_id, req, tx_handle) => {
264                    let body_str = String::from_utf8_lossy(req.body());
265                    info!(session_id=states.session_id, %dialog_id, body=%body_str, "dialog info received");
266                    if body_str.starts_with("Signal=") {
267                        let digit = body_str.trim_start_matches("Signal=").chars().next();
268                        if let Some(digit) = digit {
269                            states.event_sender.send(crate::event::SessionEvent::Dtmf {
270                                track_id: states.track_id.clone(),
271                                timestamp: crate::media::get_timestamp(),
272                                digit: digit.to_string(),
273                            })?;
274                        }
275                    }
276                    tx_handle.reply(rsip::StatusCode::OK).await.ok();
277                }
278                DialogState::Updated(dialog_id, _req, tx_handle) => {
279                    info!(session_id = states.session_id, %dialog_id, "dialog update received");
280                    let mut answer_sdp = None;
281                    if let Some(sdp_body) = _req.body().get(..) {
282                        let sdp_str = String::from_utf8_lossy(sdp_body);
283                        if !sdp_str.is_empty()
284                            && (_req.method == rsip::Method::Invite
285                                || _req.method == rsip::Method::Update)
286                        {
287                            info!(session_id=states.session_id, %dialog_id, method=%_req.method, "handling re-invite/update offer");
288                            match states
289                                .media_stream
290                                .handshake(&states.track_id, sdp_str.to_string(), None)
291                                .await
292                            {
293                                Ok(sdp) => answer_sdp = Some(sdp),
294                                Err(e) => {
295                                    warn!(
296                                        session_id = states.session_id,
297                                        "failed to handle re-invite: {}", e
298                                    );
299                                }
300                            }
301                        } else {
302                            info!(session_id=states.session_id, %dialog_id, "updating remote description:\n{}", sdp_str);
303                            states
304                                .media_stream
305                                .update_remote_description(&states.track_id, &sdp_str.to_string())
306                                .await?;
307                        }
308                    }
309
310                    if let Some(sdp) = answer_sdp {
311                        tx_handle
312                            .respond(
313                                rsip::StatusCode::OK,
314                                Some(vec![rsip::Header::ContentType(
315                                    "application/sdp".to_string().into(),
316                                )]),
317                                Some(sdp.into()),
318                            )
319                            .await
320                            .ok();
321                    } else {
322                        tx_handle.reply(rsip::StatusCode::OK).await.ok();
323                    }
324                }
325                DialogState::Options(dialog_id, _req, tx_handle) => {
326                    info!(session_id = states.session_id, %dialog_id, "dialog options received");
327                    tx_handle.reply(rsip::StatusCode::OK).await.ok();
328                }
329                DialogState::Terminated(dialog_id, reason) => {
330                    info!(
331                        session_id = states.session_id,
332                        ?dialog_id,
333                        ?reason,
334                        "dialog terminated"
335                    );
336                    states.terminated_reason = Some(reason.clone());
337                    return Ok(());
338                }
339                other_state => {
340                    info!(
341                        session_id = states.session_id,
342                        %other_state,
343                        "dialog received other state"
344                    );
345                }
346            }
347        }
348        Ok(())
349    }
350
351    pub(super) async fn process_dialog(&mut self, mut states: InviteDialogStates) {
352        let token = states.cancel_token.clone();
353        tokio::select! {
354            _ = token.cancelled() => {
355                states.terminated_reason = Some(TerminatedReason::UacCancel);
356            }
357            _ = self.dialog_event_loop(&mut states) => {}
358        };
359        self.drop_async().await;
360    }
361}
362
363#[derive(Clone)]
364pub struct Invitation {
365    pub dialog_layer: Arc<DialogLayer>,
366    pub pending_dialogs: Arc<std::sync::Mutex<HashMap<DialogId, PendingDialog>>>,
367}
368
369impl Invitation {
370    pub fn new(dialog_layer: Arc<DialogLayer>) -> Self {
371        Self {
372            dialog_layer,
373            pending_dialogs: Arc::new(std::sync::Mutex::new(HashMap::new())),
374        }
375    }
376
377    pub fn add_pending(&self, dialog_id: DialogId, pending: PendingDialog) {
378        self.pending_dialogs
379            .lock()
380            .map(|mut ps| ps.insert(dialog_id, pending))
381            .ok();
382    }
383
384    pub fn get_pending_call(&self, dialog_id: &DialogId) -> Option<PendingDialog> {
385        self.pending_dialogs
386            .lock()
387            .ok()
388            .and_then(|mut ps| ps.remove(dialog_id))
389    }
390
391    pub fn has_pending_call(&self, dialog_id: &DialogId) -> bool {
392        self.pending_dialogs
393            .lock()
394            .ok()
395            .map(|ps| ps.contains_key(dialog_id))
396            .unwrap_or(false)
397    }
398
399    pub fn find_dialog_id_by_session_id(&self, session_id: &str) -> Option<DialogId> {
400        self.pending_dialogs.lock().ok().and_then(|ps| {
401            ps.iter()
402                .find(|(id, _)| id.to_string() == session_id)
403                .map(|(id, _)| id.clone())
404        })
405    }
406
407    pub async fn hangup(
408        &self,
409        dialog_id: DialogId,
410        code: Option<rsip::StatusCode>,
411        reason: Option<String>,
412    ) -> Result<()> {
413        if let Some(call) = self.get_pending_call(&dialog_id) {
414            call.dialog.reject(code, reason).ok();
415            call.token.cancel();
416        }
417        match self.dialog_layer.get_dialog(&dialog_id) {
418            Some(dialog) => {
419                self.dialog_layer.remove_dialog(&dialog_id);
420                dialog.hangup().await.ok();
421            }
422            None => {}
423        }
424        Ok(())
425    }
426
427    pub async fn reject(&self, dialog_id: DialogId) -> Result<()> {
428        if let Some(call) = self.get_pending_call(&dialog_id) {
429            call.dialog.reject(None, None).ok();
430            call.token.cancel();
431        }
432        match self.dialog_layer.get_dialog(&dialog_id) {
433            Some(dialog) => {
434                self.dialog_layer.remove_dialog(&dialog_id);
435                dialog.hangup().await.ok();
436            }
437            None => {}
438        }
439        Ok(())
440    }
441
442    pub async fn invite(
443        &self,
444        invite_option: InviteOption,
445        state_sender: DialogStateSender,
446    ) -> Result<(DialogId, Option<Vec<u8>>), rsipstack::Error> {
447        let (dialog, resp) = self
448            .dialog_layer
449            .do_invite(invite_option, state_sender)
450            .await?;
451
452        let offer = match resp {
453            Some(resp) => match resp.status_code.kind() {
454                rsip::StatusCodeKind::Successful => {
455                    let offer = resp.body.clone();
456                    Some(offer)
457                }
458                _ => {
459                    let reason = resp
460                        .reason_phrase()
461                        .unwrap_or(&resp.status_code.to_string())
462                        .to_string();
463                    return Err(rsipstack::Error::DialogError(
464                        reason,
465                        dialog.id(),
466                        resp.status_code,
467                    ));
468                }
469            },
470            None => {
471                return Err(rsipstack::Error::DialogError(
472                    "no response received".to_string(),
473                    dialog.id(),
474                    rsip::StatusCode::NotAcceptableHere,
475                ));
476            }
477        };
478        Ok((dialog.id(), offer))
479    }
480}