Skip to main content

active_call/call/
sip.rs

1use crate::call::active_call::ActiveCallStateRef;
2use crate::callrecord::CallRecordHangupReason;
3use crate::event::EventSender;
4use crate::media::TrackId;
5use crate::media::stream::MediaStream;
6use crate::useragent::invitation::PendingDialog;
7use anyhow::Result;
8use chrono::Utc;
9use rsipstack::dialog::DialogId;
10use rsipstack::dialog::dialog::{
11    Dialog, DialogState, DialogStateReceiver, DialogStateSender, TerminatedReason,
12};
13use rsipstack::dialog::dialog_layer::DialogLayer;
14use rsipstack::dialog::invitation::InviteOption;
15use rsipstack::rsip_ext::RsipResponseExt;
16use std::collections::HashMap;
17use std::sync::Arc;
18use tokio_util::sync::CancellationToken;
19use tracing::{info, warn};
20
21pub struct DialogStateReceiverGuard {
22    pub(super) dialog_layer: Arc<DialogLayer>,
23    pub(super) receiver: DialogStateReceiver,
24    pub(super) dialog_id: Option<DialogId>,
25    pub(super) hangup_headers: Option<Vec<rsip::Header>>,
26}
27
28impl DialogStateReceiverGuard {
29    pub fn new(
30        dialog_layer: Arc<DialogLayer>,
31        receiver: DialogStateReceiver,
32        hangup_headers: Option<Vec<rsip::Header>>,
33    ) -> Self {
34        Self {
35            dialog_layer,
36            receiver,
37            dialog_id: None,
38            hangup_headers,
39        }
40    }
41    pub async fn recv(&mut self) -> Option<DialogState> {
42        let state = self.receiver.recv().await;
43        if let Some(ref s) = state {
44            self.dialog_id = Some(s.id().clone());
45        }
46        state
47    }
48
49    fn take_dialog(&mut self) -> Option<Dialog> {
50        let id = match self.dialog_id.take() {
51            Some(id) => id,
52            None => return None,
53        };
54
55        match self.dialog_layer.get_dialog(&id) {
56            Some(dialog) => {
57                info!(%id, "dialog removed on  drop");
58                self.dialog_layer.remove_dialog(&id);
59                return Some(dialog);
60            }
61            _ => {}
62        }
63        None
64    }
65
66    pub async fn drop_async(&mut self) {
67        if let Some(dialog) = self.take_dialog() {
68            if let Err(e) = dialog.hangup_with_headers(self.hangup_headers.take()).await {
69                warn!(id=%dialog.id(), "error hanging up dialog on drop: {}", e);
70            }
71        }
72    }
73}
74
75impl Drop for DialogStateReceiverGuard {
76    fn drop(&mut self) {
77        if let Some(dialog) = self.take_dialog() {
78            crate::spawn(async move {
79                if let Err(e) = dialog.hangup().await {
80                    warn!(id=%dialog.id(), "error hanging up dialog on drop: {}", e);
81                }
82            });
83        }
84    }
85}
86
87pub(super) struct InviteDialogStates {
88    pub is_client: bool,
89    pub session_id: String,
90    pub track_id: TrackId,
91    pub cancel_token: CancellationToken,
92    pub event_sender: EventSender,
93    pub call_state: ActiveCallStateRef,
94    pub media_stream: Arc<MediaStream>,
95    pub terminated_reason: Option<TerminatedReason>,
96    pub has_early_media: bool,
97}
98
99impl InviteDialogStates {
100    pub(super) fn on_terminated(&mut self) {
101        let mut call_state_ref = match self.call_state.try_write() {
102            Ok(cs) => cs,
103            Err(_) => {
104                return;
105            }
106        };
107        let reason = &self.terminated_reason;
108        call_state_ref.last_status_code = match reason {
109            Some(TerminatedReason::UacCancel) => 487,
110            Some(TerminatedReason::UacBye) => 200,
111            Some(TerminatedReason::UacBusy) => 486,
112            Some(TerminatedReason::UasBye) => 200,
113            Some(TerminatedReason::UasBusy) => 486,
114            Some(TerminatedReason::UasDecline) => 603,
115            Some(TerminatedReason::UacOther(code)) => code.code(),
116            Some(TerminatedReason::UasOther(code)) => code.code(),
117            _ => 500, // Default to internal server error
118        };
119
120        if call_state_ref.hangup_reason.is_none() {
121            call_state_ref.hangup_reason.replace(match reason {
122                Some(TerminatedReason::UacCancel) => CallRecordHangupReason::Canceled,
123                Some(TerminatedReason::UacBye) | Some(TerminatedReason::UacBusy) => {
124                    CallRecordHangupReason::ByCaller
125                }
126                Some(TerminatedReason::UasBye) | Some(TerminatedReason::UasBusy) => {
127                    CallRecordHangupReason::ByCallee
128                }
129                Some(TerminatedReason::UasDecline) => CallRecordHangupReason::ByCallee,
130                Some(TerminatedReason::UacOther(_)) => CallRecordHangupReason::ByCaller,
131                Some(TerminatedReason::UasOther(_)) => CallRecordHangupReason::ByCallee,
132                _ => CallRecordHangupReason::BySystem,
133            });
134        };
135        let initiator = match reason {
136            Some(TerminatedReason::UacCancel) => "caller".to_string(),
137            Some(TerminatedReason::UacBye) | Some(TerminatedReason::UacBusy) => {
138                "caller".to_string()
139            }
140            Some(TerminatedReason::UasBye)
141            | Some(TerminatedReason::UasBusy)
142            | Some(TerminatedReason::UasDecline) => "callee".to_string(),
143            _ => "system".to_string(),
144        };
145        self.event_sender
146            .send(crate::event::SessionEvent::TrackEnd {
147                track_id: self.track_id.clone(),
148                timestamp: crate::media::get_timestamp(),
149                duration: call_state_ref
150                    .answer_time
151                    .map(|t| (Utc::now() - t).num_milliseconds())
152                    .unwrap_or_default() as u64,
153                ssrc: call_state_ref.ssrc,
154                play_id: None,
155            })
156            .ok();
157        let hangup_event =
158            call_state_ref.build_hangup_event(self.track_id.clone(), Some(initiator));
159        self.event_sender.send(hangup_event).ok();
160    }
161}
162
163impl Drop for InviteDialogStates {
164    fn drop(&mut self) {
165        self.on_terminated();
166        self.cancel_token.cancel();
167    }
168}
169
170impl DialogStateReceiverGuard {
171    pub(self) async fn dialog_event_loop(&mut self, states: &mut InviteDialogStates) -> Result<()> {
172        while let Some(event) = self.recv().await {
173            match event {
174                DialogState::Calling(dialog_id) => {
175                    info!(session_id=states.session_id, %dialog_id, "dialog calling");
176                    states.call_state.write().await.session_id = dialog_id.to_string();
177                }
178                DialogState::Trying(_) => {}
179                DialogState::Early(dialog_id, resp) => {
180                    let code = resp.status_code.code();
181                    let body = resp.body();
182                    let answer = String::from_utf8_lossy(body);
183                    let has_sdp = !answer.is_empty();
184                    info!(session_id=states.session_id, %dialog_id, has_sdp=%has_sdp, "dialog early ({}): \n{}", code, answer);
185
186                    {
187                        let mut cs = states.call_state.write().await;
188                        if cs.ring_time.is_none() {
189                            cs.ring_time.replace(Utc::now());
190                        }
191                        cs.last_status_code = code;
192                    }
193
194                    if !states.is_client {
195                        continue;
196                    }
197
198                    let refer = states.call_state.read().await.is_refer;
199
200                    states
201                        .event_sender
202                        .send(crate::event::SessionEvent::Ringing {
203                            track_id: states.track_id.clone(),
204                            timestamp: crate::media::get_timestamp(),
205                            early_media: has_sdp,
206                            refer: Some(refer),
207                        })?;
208
209                    if has_sdp {
210                        states.has_early_media = true;
211                        states
212                            .media_stream
213                            .update_remote_description(&states.track_id, &answer.to_string())
214                            .await?;
215                    }
216                }
217                DialogState::Confirmed(dialog_id, msg) => {
218                    info!(session_id=states.session_id, %dialog_id, has_early_media=%states.has_early_media, "dialog confirmed");
219                    {
220                        let mut cs = states.call_state.write().await;
221                        cs.session_id = dialog_id.to_string();
222                        cs.answer_time.replace(Utc::now());
223                        cs.last_status_code = 200;
224                    }
225                    if states.is_client {
226                        let answer = String::from_utf8_lossy(msg.body());
227                        let answer = answer.trim();
228                        if !answer.is_empty() {
229                            if states.has_early_media {
230                                info!(
231                                    session_id = states.session_id,
232                                    "updating remote description with final answer after early media (force=true)"
233                                );
234                                // Force update when transitioning from early media (183) to confirmed (200 OK)
235                                // This ensures media parameters are properly updated even if SDP appears similar
236                                if let Err(e) = states
237                                    .media_stream
238                                    .update_remote_description_force(
239                                        &states.track_id,
240                                        &answer.to_string(),
241                                    )
242                                    .await
243                                {
244                                    tracing::warn!(
245                                        session_id = states.session_id,
246                                        "failed to force update remote description on confirmed: {}",
247                                        e
248                                    );
249                                }
250                            } else {
251                                if let Err(e) = states
252                                    .media_stream
253                                    .update_remote_description(
254                                        &states.track_id,
255                                        &answer.to_string(),
256                                    )
257                                    .await
258                                {
259                                    tracing::warn!(
260                                        session_id = states.session_id,
261                                        "failed to update remote description on confirmed: {}",
262                                        e
263                                    );
264                                }
265                            }
266                        }
267                    }
268                }
269                DialogState::Info(dialog_id, req, tx_handle) => {
270                    let body_str = String::from_utf8_lossy(req.body());
271                    info!(session_id=states.session_id, %dialog_id, body=%body_str, "dialog info received");
272                    if body_str.starts_with("Signal=") {
273                        let digit = body_str.trim_start_matches("Signal=").chars().next();
274                        if let Some(digit) = digit {
275                            states.event_sender.send(crate::event::SessionEvent::Dtmf {
276                                track_id: states.track_id.clone(),
277                                timestamp: crate::media::get_timestamp(),
278                                digit: digit.to_string(),
279                            })?;
280                        }
281                    }
282                    tx_handle.reply(rsip::StatusCode::OK).await.ok();
283                }
284                DialogState::Updated(dialog_id, _req, tx_handle) => {
285                    info!(session_id = states.session_id, %dialog_id, "dialog update received");
286                    let mut answer_sdp = None;
287                    if let Some(sdp_body) = _req.body().get(..) {
288                        let sdp_str = String::from_utf8_lossy(sdp_body);
289                        if !sdp_str.is_empty()
290                            && (_req.method == rsip::Method::Invite
291                                || _req.method == rsip::Method::Update)
292                        {
293                            info!(session_id=states.session_id, %dialog_id, method=%_req.method, "handling re-invite/update offer");
294                            match states
295                                .media_stream
296                                .handshake(&states.track_id, sdp_str.to_string(), None)
297                                .await
298                            {
299                                Ok(sdp) => answer_sdp = Some(sdp),
300                                Err(e) => {
301                                    warn!(
302                                        session_id = states.session_id,
303                                        "failed to handle re-invite: {}", e
304                                    );
305                                }
306                            }
307                        } else {
308                            info!(session_id=states.session_id, %dialog_id, "updating remote description:\n{}", sdp_str);
309                            states
310                                .media_stream
311                                .update_remote_description(&states.track_id, &sdp_str.to_string())
312                                .await?;
313                        }
314                    }
315
316                    if let Some(sdp) = answer_sdp {
317                        tx_handle
318                            .respond(
319                                rsip::StatusCode::OK,
320                                Some(vec![rsip::Header::ContentType(
321                                    "application/sdp".to_string().into(),
322                                )]),
323                                Some(sdp.into()),
324                            )
325                            .await
326                            .ok();
327                    } else {
328                        tx_handle.reply(rsip::StatusCode::OK).await.ok();
329                    }
330                }
331                DialogState::Options(dialog_id, _req, tx_handle) => {
332                    info!(session_id = states.session_id, %dialog_id, "dialog options received");
333                    tx_handle.reply(rsip::StatusCode::OK).await.ok();
334                }
335                DialogState::Terminated(dialog_id, reason) => {
336                    info!(
337                        session_id = states.session_id,
338                        ?dialog_id,
339                        ?reason,
340                        "dialog terminated"
341                    );
342                    states.terminated_reason = Some(reason.clone());
343                    return Ok(());
344                }
345                other_state => {
346                    info!(
347                        session_id = states.session_id,
348                        %other_state,
349                        "dialog received other state"
350                    );
351                }
352            }
353        }
354        Ok(())
355    }
356
357    pub(super) async fn process_dialog(&mut self, mut states: InviteDialogStates) {
358        let token = states.cancel_token.clone();
359        tokio::select! {
360            _ = token.cancelled() => {
361                states.terminated_reason = Some(TerminatedReason::UacCancel);
362            }
363            _ = self.dialog_event_loop(&mut states) => {}
364        };
365
366        // Update hangup headers from ActiveCallState if available
367        {
368            let state = states.call_state.read().await;
369            if let Some(extras) = &state.extras {
370                if let Some(h_val) = extras.get("_sip_headers") {
371                    if let Ok(headers_map) =
372                        serde_json::from_value::<HashMap<String, String>>(h_val.clone())
373                    {
374                        let mut headers = Vec::new();
375                        for (k, v) in headers_map {
376                            headers.push(rsip::Header::Other(k.into(), v.into()));
377                        }
378                        if !headers.is_empty() {
379                            if let Some(existing) = &mut self.hangup_headers {
380                                existing.extend(headers);
381                            } else {
382                                self.hangup_headers = Some(headers);
383                            }
384                        }
385                    }
386                }
387            }
388        }
389
390        self.drop_async().await;
391    }
392}
393
394#[derive(Clone)]
395pub struct Invitation {
396    pub dialog_layer: Arc<DialogLayer>,
397    pub pending_dialogs: Arc<std::sync::Mutex<HashMap<DialogId, PendingDialog>>>,
398}
399
400impl Invitation {
401    pub fn new(dialog_layer: Arc<DialogLayer>) -> Self {
402        Self {
403            dialog_layer,
404            pending_dialogs: Arc::new(std::sync::Mutex::new(HashMap::new())),
405        }
406    }
407
408    pub fn add_pending(&self, dialog_id: DialogId, pending: PendingDialog) {
409        self.pending_dialogs
410            .lock()
411            .map(|mut ps| ps.insert(dialog_id, pending))
412            .ok();
413    }
414
415    pub fn get_pending_call(&self, dialog_id: &DialogId) -> Option<PendingDialog> {
416        self.pending_dialogs
417            .lock()
418            .ok()
419            .and_then(|mut ps| ps.remove(dialog_id))
420    }
421
422    pub fn has_pending_call(&self, dialog_id: &DialogId) -> bool {
423        self.pending_dialogs
424            .lock()
425            .ok()
426            .map(|ps| ps.contains_key(dialog_id))
427            .unwrap_or(false)
428    }
429
430    pub fn find_dialog_id_by_session_id(&self, session_id: &str) -> Option<DialogId> {
431        self.pending_dialogs.lock().ok().and_then(|ps| {
432            ps.iter()
433                .find(|(id, _)| id.to_string() == session_id)
434                .map(|(id, _)| id.clone())
435        })
436    }
437
438    pub async fn hangup(
439        &self,
440        dialog_id: DialogId,
441        code: Option<rsip::StatusCode>,
442        reason: Option<String>,
443    ) -> Result<()> {
444        if let Some(call) = self.get_pending_call(&dialog_id) {
445            call.dialog.reject(code, reason).ok();
446            call.token.cancel();
447        }
448        match self.dialog_layer.get_dialog(&dialog_id) {
449            Some(dialog) => {
450                self.dialog_layer.remove_dialog(&dialog_id);
451                dialog.hangup().await.ok();
452            }
453            None => {}
454        }
455        Ok(())
456    }
457
458    pub async fn reject(&self, dialog_id: DialogId) -> Result<()> {
459        if let Some(call) = self.get_pending_call(&dialog_id) {
460            call.dialog.reject(None, None).ok();
461            call.token.cancel();
462        }
463        match self.dialog_layer.get_dialog(&dialog_id) {
464            Some(dialog) => {
465                self.dialog_layer.remove_dialog(&dialog_id);
466                dialog.hangup().await.ok();
467            }
468            None => {}
469        }
470        Ok(())
471    }
472
473    pub async fn invite(
474        &self,
475        invite_option: InviteOption,
476        state_sender: DialogStateSender,
477    ) -> Result<(DialogId, Option<Vec<u8>>), rsipstack::Error> {
478        let (dialog, resp) = self
479            .dialog_layer
480            .do_invite(invite_option, state_sender)
481            .await?;
482
483        let offer = match resp {
484            Some(resp) => match resp.status_code.kind() {
485                rsip::StatusCodeKind::Successful => {
486                    let offer = resp.body.clone();
487                    Some(offer)
488                }
489                _ => {
490                    let reason = resp
491                        .reason_phrase()
492                        .unwrap_or(&resp.status_code.to_string())
493                        .to_string();
494                    return Err(rsipstack::Error::DialogError(
495                        reason,
496                        dialog.id(),
497                        resp.status_code,
498                    ));
499                }
500            },
501            None => {
502                return Err(rsipstack::Error::DialogError(
503                    "no response received".to_string(),
504                    dialog.id(),
505                    rsip::StatusCode::NotAcceptableHere,
506                ));
507            }
508        };
509        Ok((dialog.id(), offer))
510    }
511}