active_call/call/
sip.rs

1use crate::call::active_call::ActiveCallStateRef;
2use crate::callrecord::CallRecordHangupReason;
3use crate::event::EventSender;
4use crate::media::TrackId;
5use crate::media::stream::MediaStream;
6use crate::useragent::invitation::PendingDialog;
7use anyhow::Result;
8use chrono::Utc;
9use rsipstack::dialog::DialogId;
10use rsipstack::dialog::dialog::{
11    Dialog, DialogState, DialogStateReceiver, DialogStateSender, TerminatedReason,
12};
13use rsipstack::dialog::dialog_layer::DialogLayer;
14use rsipstack::dialog::invitation::InviteOption;
15use rsipstack::rsip_ext::RsipResponseExt;
16use std::collections::HashMap;
17use std::sync::Arc;
18use tokio_util::sync::CancellationToken;
19use tracing::{info, warn};
20
21pub struct DialogStateReceiverGuard {
22    pub(super) dialog_layer: Arc<DialogLayer>,
23    pub(super) receiver: DialogStateReceiver,
24    pub(super) dialog_id: Option<DialogId>,
25}
26
27impl DialogStateReceiverGuard {
28    pub fn new(dialog_layer: Arc<DialogLayer>, receiver: DialogStateReceiver) -> Self {
29        Self {
30            dialog_layer,
31            receiver,
32            dialog_id: None,
33        }
34    }
35    pub async fn recv(&mut self) -> Option<DialogState> {
36        let state = self.receiver.recv().await;
37        if let Some(ref s) = state {
38            self.dialog_id = Some(s.id().clone());
39        }
40        state
41    }
42
43    fn take_dialog(&mut self) -> Option<Dialog> {
44        let id = match self.dialog_id.take() {
45            Some(id) => id,
46            None => return None,
47        };
48
49        match self.dialog_layer.get_dialog(&id) {
50            Some(dialog) => {
51                info!(%id, "dialog removed on  drop");
52                self.dialog_layer.remove_dialog(&id);
53                return Some(dialog);
54            }
55            _ => {}
56        }
57        None
58    }
59
60    pub async fn drop_async(&mut self) {
61        if let Some(dialog) = self.take_dialog() {
62            if let Err(e) = dialog.hangup().await {
63                warn!(id=%dialog.id(), "error hanging up dialog on drop: {}", e);
64            }
65        }
66    }
67}
68
69impl Drop for DialogStateReceiverGuard {
70    fn drop(&mut self) {
71        if let Some(dialog) = self.take_dialog() {
72            tokio::spawn(async move {
73                if let Err(e) = dialog.hangup().await {
74                    warn!(id=%dialog.id(), "error hanging up dialog on drop: {}", e);
75                }
76            });
77        }
78    }
79}
80
81pub(super) struct InviteDialogStates {
82    pub is_client: bool,
83    pub session_id: String,
84    pub track_id: TrackId,
85    pub cancel_token: CancellationToken,
86    pub event_sender: EventSender,
87    pub call_state: ActiveCallStateRef,
88    pub media_stream: Arc<MediaStream>,
89    pub terminated_reason: Option<TerminatedReason>,
90}
91
92impl InviteDialogStates {
93    pub(super) fn on_terminated(&mut self) {
94        let mut call_state_ref = match self.call_state.write() {
95            Ok(cs) => cs,
96            Err(_) => {
97                return;
98            }
99        };
100        let reason = &self.terminated_reason;
101        call_state_ref.last_status_code = match reason {
102            Some(TerminatedReason::UacCancel) => 487,
103            Some(TerminatedReason::UacBye) => 200,
104            Some(TerminatedReason::UacBusy) => 486,
105            Some(TerminatedReason::UasBye) => 200,
106            Some(TerminatedReason::UasBusy) => 486,
107            Some(TerminatedReason::UasDecline) => 603,
108            Some(TerminatedReason::UacOther(code)) => code.code(),
109            Some(TerminatedReason::UasOther(code)) => code.code(),
110            _ => 500, // Default to internal server error
111        };
112
113        if call_state_ref.hangup_reason.is_none() {
114            call_state_ref.hangup_reason.replace(match reason {
115                Some(TerminatedReason::UacCancel) => CallRecordHangupReason::Canceled,
116                Some(TerminatedReason::UacBye) | Some(TerminatedReason::UacBusy) => {
117                    CallRecordHangupReason::ByCaller
118                }
119                Some(TerminatedReason::UasBye) | Some(TerminatedReason::UasBusy) => {
120                    CallRecordHangupReason::ByCallee
121                }
122                Some(TerminatedReason::UasDecline) => CallRecordHangupReason::ByCallee,
123                Some(TerminatedReason::UacOther(_)) => CallRecordHangupReason::ByCaller,
124                Some(TerminatedReason::UasOther(_)) => CallRecordHangupReason::ByCallee,
125                _ => CallRecordHangupReason::BySystem,
126            });
127        };
128        let initiator = match reason {
129            Some(TerminatedReason::UacCancel) => "caller".to_string(),
130            Some(TerminatedReason::UacBye) | Some(TerminatedReason::UacBusy) => {
131                "caller".to_string()
132            }
133            Some(TerminatedReason::UasBye)
134            | Some(TerminatedReason::UasBusy)
135            | Some(TerminatedReason::UasDecline) => "callee".to_string(),
136            _ => "system".to_string(),
137        };
138        self.event_sender
139            .send(crate::event::SessionEvent::TrackEnd {
140                track_id: self.track_id.clone(),
141                timestamp: crate::media::get_timestamp(),
142                duration: call_state_ref
143                    .answer_time
144                    .map(|t| (Utc::now() - t).num_milliseconds())
145                    .unwrap_or_default() as u64,
146                ssrc: call_state_ref.ssrc,
147                play_id: None,
148            })
149            .ok();
150        let hangup_event =
151            call_state_ref.build_hangup_event(self.track_id.clone(), Some(initiator));
152        self.event_sender.send(hangup_event).ok();
153    }
154}
155
156impl Drop for InviteDialogStates {
157    fn drop(&mut self) {
158        self.on_terminated();
159        self.cancel_token.cancel();
160    }
161}
162
163impl DialogStateReceiverGuard {
164    pub(self) async fn dialog_event_loop(&mut self, states: &mut InviteDialogStates) -> Result<()> {
165        while let Some(event) = self.recv().await {
166            match event {
167                DialogState::Calling(dialog_id) => {
168                    info!(session_id=states.session_id, %dialog_id, "dialog calling");
169                    states
170                        .call_state
171                        .as_ref()
172                        .write()
173                        .map(|mut cs| cs.session_id = dialog_id.to_string())
174                        .ok();
175                }
176                DialogState::Trying(_) => {}
177                DialogState::Early(dialog_id, resp) => {
178                    let code = resp.status_code.code();
179                    let body = resp.body();
180                    let answer = String::from_utf8_lossy(body);
181                    info!(session_id=states.session_id, %dialog_id,  "dialog earlyanswer: \n{}", answer);
182
183                    states
184                        .call_state
185                        .as_ref()
186                        .write()
187                        .map(|mut cs| {
188                            if cs.ring_time.is_none() {
189                                cs.ring_time.replace(Utc::now());
190                            }
191                            cs.last_status_code = code;
192                        })
193                        .ok();
194
195                    if !states.is_client {
196                        continue;
197                    }
198
199                    let refer = states
200                        .call_state
201                        .read()
202                        .map(|cs| cs.is_refer)
203                        .unwrap_or(false);
204
205                    states
206                        .event_sender
207                        .send(crate::event::SessionEvent::Ringing {
208                            track_id: states.track_id.clone(),
209                            timestamp: crate::media::get_timestamp(),
210                            early_media: !answer.is_empty(),
211                            refer: Some(refer),
212                        })?;
213
214                    if answer.is_empty() {
215                        continue;
216                    }
217                    states
218                        .media_stream
219                        .update_remote_description(&states.track_id, &answer.to_string())
220                        .await?;
221                }
222                DialogState::Confirmed(dialog_id, _) => {
223                    info!(session_id=states.session_id, %dialog_id, "dialog confirmed");
224                    states
225                        .call_state
226                        .as_ref()
227                        .write()
228                        .map(|mut cs| {
229                            cs.session_id = dialog_id.to_string();
230                            cs.answer_time.replace(Utc::now());
231                            cs.last_status_code = 200;
232                        })
233                        .ok();
234                }
235                DialogState::Info(dialog_id, req, tx_handle) => {
236                    let body_str = String::from_utf8_lossy(req.body());
237                    info!(session_id=states.session_id, %dialog_id, body=%body_str, "dialog info received");
238                    if body_str.starts_with("Signal=") {
239                        let digit = body_str.trim_start_matches("Signal=").chars().next();
240                        if let Some(digit) = digit {
241                            states.event_sender.send(crate::event::SessionEvent::Dtmf {
242                                track_id: states.track_id.clone(),
243                                timestamp: crate::media::get_timestamp(),
244                                digit: digit.to_string(),
245                            })?;
246                        }
247                    }
248                    tx_handle.reply(rsip::StatusCode::OK).await.ok();
249                }
250                DialogState::Updated(dialog_id, _req, tx_handle) => {
251                    info!(session_id = states.session_id, %dialog_id, "dialog update received");
252                    if let Some(sdp_body) = _req.body().get(..) {
253                        let sdp_str = String::from_utf8_lossy(sdp_body);
254                        info!(session_id=states.session_id, %dialog_id, "updating remote description:\n{}", sdp_str);
255                        states
256                            .media_stream
257                            .update_remote_description(&states.track_id, &sdp_str.to_string())
258                            .await?;
259                    }
260                    tx_handle.reply(rsip::StatusCode::OK).await.ok();
261                }
262                DialogState::Options(dialog_id, _req, tx_handle) => {
263                    info!(session_id = states.session_id, %dialog_id, "dialog options received");
264                    tx_handle.reply(rsip::StatusCode::OK).await.ok();
265                }
266                DialogState::Terminated(dialog_id, reason) => {
267                    info!(
268                        session_id = states.session_id,
269                        ?dialog_id,
270                        ?reason,
271                        "dialog terminated"
272                    );
273                    states.terminated_reason = Some(reason.clone());
274                    return Ok(());
275                }
276                other_state => {
277                    info!(
278                        session_id = states.session_id,
279                        %other_state,
280                        "dialog received other state"
281                    );
282                }
283            }
284        }
285        Ok(())
286    }
287
288    pub(super) async fn process_dialog(&mut self, mut states: InviteDialogStates) {
289        let token = states.cancel_token.clone();
290        tokio::select! {
291            _ = token.cancelled() => {
292                states.terminated_reason = Some(TerminatedReason::UacCancel);
293            }
294            _ = self.dialog_event_loop(&mut states) => {}
295        };
296        self.drop_async().await;
297    }
298}
299
300#[derive(Clone)]
301pub struct Invitation {
302    pub dialog_layer: Arc<DialogLayer>,
303    pub pending_dialogs: Arc<std::sync::Mutex<HashMap<String, PendingDialog>>>,
304}
305
306impl Invitation {
307    pub fn new(dialog_layer: Arc<DialogLayer>) -> Self {
308        Self {
309            dialog_layer,
310            pending_dialogs: Arc::new(std::sync::Mutex::new(HashMap::new())),
311        }
312    }
313
314    pub fn add_pending(&self, session_id: String, pending: PendingDialog) {
315        self.pending_dialogs
316            .lock()
317            .map(|mut ps| ps.insert(session_id, pending))
318            .ok();
319    }
320
321    pub fn get_pending_call(&self, session_id: &String) -> Option<PendingDialog> {
322        self.pending_dialogs
323            .lock()
324            .ok()
325            .map(|mut ps| ps.remove(session_id))
326            .flatten()
327    }
328
329    pub fn has_pending_call(&self, session_id: &str) -> Option<DialogId> {
330        self.pending_dialogs
331            .lock()
332            .ok()
333            .map(|ps| ps.get(session_id).map(|d| d.dialog.id()))
334            .flatten()
335    }
336
337    pub async fn hangup(
338        &self,
339        dialog_id: DialogId,
340        code: Option<rsip::StatusCode>,
341        reason: Option<String>,
342    ) -> Result<()> {
343        if let Some(call) = self.get_pending_call(&dialog_id.to_string()) {
344            call.dialog.reject(code, reason).ok();
345            call.token.cancel();
346        }
347        match self.dialog_layer.get_dialog(&dialog_id) {
348            Some(dialog) => {
349                self.dialog_layer.remove_dialog(&dialog_id);
350                dialog.hangup().await.ok();
351            }
352            None => {}
353        }
354        Ok(())
355    }
356
357    pub async fn reject(&self, dialog_id: DialogId) -> Result<()> {
358        if let Some(call) = self.get_pending_call(&dialog_id.to_string()) {
359            call.dialog.reject(None, None).ok();
360            call.token.cancel();
361        }
362        match self.dialog_layer.get_dialog(&dialog_id) {
363            Some(dialog) => {
364                self.dialog_layer.remove_dialog(&dialog_id);
365                dialog.hangup().await.ok();
366            }
367            None => {}
368        }
369        Ok(())
370    }
371
372    pub async fn invite(
373        &self,
374        invite_option: InviteOption,
375        state_sender: DialogStateSender,
376    ) -> Result<(DialogId, Option<Vec<u8>>), rsipstack::Error> {
377        let (dialog, resp) = self
378            .dialog_layer
379            .do_invite(invite_option, state_sender)
380            .await?;
381
382        let offer = match resp {
383            Some(resp) => match resp.status_code.kind() {
384                rsip::StatusCodeKind::Successful => {
385                    let offer = resp.body.clone();
386                    Some(offer)
387                }
388                _ => {
389                    let reason = resp
390                        .reason_phrase()
391                        .unwrap_or(&resp.status_code.to_string())
392                        .to_string();
393                    return Err(rsipstack::Error::DialogError(
394                        reason,
395                        dialog.id(),
396                        resp.status_code,
397                    ));
398                }
399            },
400            None => {
401                return Err(rsipstack::Error::DialogError(
402                    "no response received".to_string(),
403                    dialog.id(),
404                    rsip::StatusCode::NotAcceptableHere,
405                ));
406            }
407        };
408        Ok((dialog.id(), offer))
409    }
410}