1use crate::call::active_call::ActiveCallStateRef;
2use crate::callrecord::CallRecordHangupReason;
3use crate::event::EventSender;
4use crate::media::TrackId;
5use crate::media::stream::MediaStream;
6use crate::useragent::invitation::PendingDialog;
7use anyhow::Result;
8use chrono::Utc;
9use rsipstack::dialog::DialogId;
10use rsipstack::dialog::dialog::{
11 Dialog, DialogState, DialogStateReceiver, DialogStateSender, TerminatedReason,
12};
13use rsipstack::dialog::dialog_layer::DialogLayer;
14use rsipstack::dialog::invitation::InviteOption;
15use rsipstack::rsip_ext::RsipResponseExt;
16use std::collections::HashMap;
17use std::sync::Arc;
18use tokio_util::sync::CancellationToken;
19use tracing::{info, warn};
20
21pub struct DialogStateReceiverGuard {
22 pub(super) dialog_layer: Arc<DialogLayer>,
23 pub(super) receiver: DialogStateReceiver,
24 pub(super) dialog_id: Option<DialogId>,
25}
26
27impl DialogStateReceiverGuard {
28 pub fn new(dialog_layer: Arc<DialogLayer>, receiver: DialogStateReceiver) -> Self {
29 Self {
30 dialog_layer,
31 receiver,
32 dialog_id: None,
33 }
34 }
35 pub async fn recv(&mut self) -> Option<DialogState> {
36 let state = self.receiver.recv().await;
37 if let Some(ref s) = state {
38 self.dialog_id = Some(s.id().clone());
39 }
40 state
41 }
42
43 fn take_dialog(&mut self) -> Option<Dialog> {
44 let id = match self.dialog_id.take() {
45 Some(id) => id,
46 None => return None,
47 };
48
49 match self.dialog_layer.get_dialog(&id) {
50 Some(dialog) => {
51 info!(%id, "dialog removed on drop");
52 self.dialog_layer.remove_dialog(&id);
53 return Some(dialog);
54 }
55 _ => {}
56 }
57 None
58 }
59
60 pub async fn drop_async(&mut self) {
61 if let Some(dialog) = self.take_dialog() {
62 if let Err(e) = dialog.hangup().await {
63 warn!(id=%dialog.id(), "error hanging up dialog on drop: {}", e);
64 }
65 }
66 }
67}
68
69impl Drop for DialogStateReceiverGuard {
70 fn drop(&mut self) {
71 if let Some(dialog) = self.take_dialog() {
72 tokio::spawn(async move {
73 if let Err(e) = dialog.hangup().await {
74 warn!(id=%dialog.id(), "error hanging up dialog on drop: {}", e);
75 }
76 });
77 }
78 }
79}
80
81pub(super) struct InviteDialogStates {
82 pub is_client: bool,
83 pub session_id: String,
84 pub track_id: TrackId,
85 pub cancel_token: CancellationToken,
86 pub event_sender: EventSender,
87 pub call_state: ActiveCallStateRef,
88 pub media_stream: Arc<MediaStream>,
89 pub terminated_reason: Option<TerminatedReason>,
90}
91
92impl InviteDialogStates {
93 pub(super) fn on_terminated(&mut self) {
94 let mut call_state_ref = match self.call_state.write() {
95 Ok(cs) => cs,
96 Err(_) => {
97 return;
98 }
99 };
100 let reason = &self.terminated_reason;
101 call_state_ref.last_status_code = match reason {
102 Some(TerminatedReason::UacCancel) => 487,
103 Some(TerminatedReason::UacBye) => 200,
104 Some(TerminatedReason::UacBusy) => 486,
105 Some(TerminatedReason::UasBye) => 200,
106 Some(TerminatedReason::UasBusy) => 486,
107 Some(TerminatedReason::UasDecline) => 603,
108 Some(TerminatedReason::UacOther(code)) => code.code(),
109 Some(TerminatedReason::UasOther(code)) => code.code(),
110 _ => 500, };
112
113 if call_state_ref.hangup_reason.is_none() {
114 call_state_ref.hangup_reason.replace(match reason {
115 Some(TerminatedReason::UacCancel) => CallRecordHangupReason::Canceled,
116 Some(TerminatedReason::UacBye) | Some(TerminatedReason::UacBusy) => {
117 CallRecordHangupReason::ByCaller
118 }
119 Some(TerminatedReason::UasBye) | Some(TerminatedReason::UasBusy) => {
120 CallRecordHangupReason::ByCallee
121 }
122 Some(TerminatedReason::UasDecline) => CallRecordHangupReason::ByCallee,
123 Some(TerminatedReason::UacOther(_)) => CallRecordHangupReason::ByCaller,
124 Some(TerminatedReason::UasOther(_)) => CallRecordHangupReason::ByCallee,
125 _ => CallRecordHangupReason::BySystem,
126 });
127 };
128 let initiator = match reason {
129 Some(TerminatedReason::UacCancel) => "caller".to_string(),
130 Some(TerminatedReason::UacBye) | Some(TerminatedReason::UacBusy) => {
131 "caller".to_string()
132 }
133 Some(TerminatedReason::UasBye)
134 | Some(TerminatedReason::UasBusy)
135 | Some(TerminatedReason::UasDecline) => "callee".to_string(),
136 _ => "system".to_string(),
137 };
138 self.event_sender
139 .send(crate::event::SessionEvent::TrackEnd {
140 track_id: self.track_id.clone(),
141 timestamp: crate::media::get_timestamp(),
142 duration: call_state_ref
143 .answer_time
144 .map(|t| (Utc::now() - t).num_milliseconds())
145 .unwrap_or_default() as u64,
146 ssrc: call_state_ref.ssrc,
147 play_id: None,
148 })
149 .ok();
150 let hangup_event =
151 call_state_ref.build_hangup_event(self.track_id.clone(), Some(initiator));
152 self.event_sender.send(hangup_event).ok();
153 }
154}
155
156impl Drop for InviteDialogStates {
157 fn drop(&mut self) {
158 self.on_terminated();
159 self.cancel_token.cancel();
160 }
161}
162
163impl DialogStateReceiverGuard {
164 pub(self) async fn dialog_event_loop(&mut self, states: &mut InviteDialogStates) -> Result<()> {
165 while let Some(event) = self.recv().await {
166 match event {
167 DialogState::Calling(dialog_id) => {
168 info!(session_id=states.session_id, %dialog_id, "dialog calling");
169 states
170 .call_state
171 .as_ref()
172 .write()
173 .map(|mut cs| cs.session_id = dialog_id.to_string())
174 .ok();
175 }
176 DialogState::Trying(_) => {}
177 DialogState::Early(dialog_id, resp) => {
178 let code = resp.status_code.code();
179 let body = resp.body();
180 let answer = String::from_utf8_lossy(body);
181 info!(session_id=states.session_id, %dialog_id, "dialog earlyanswer: \n{}", answer);
182
183 states
184 .call_state
185 .as_ref()
186 .write()
187 .map(|mut cs| {
188 if cs.ring_time.is_none() {
189 cs.ring_time.replace(Utc::now());
190 }
191 cs.last_status_code = code;
192 })
193 .ok();
194
195 if !states.is_client {
196 continue;
197 }
198
199 let refer = states
200 .call_state
201 .read()
202 .map(|cs| cs.is_refer)
203 .unwrap_or(false);
204
205 states
206 .event_sender
207 .send(crate::event::SessionEvent::Ringing {
208 track_id: states.track_id.clone(),
209 timestamp: crate::media::get_timestamp(),
210 early_media: !answer.is_empty(),
211 refer: Some(refer),
212 })?;
213
214 if answer.is_empty() {
215 continue;
216 }
217 states
218 .media_stream
219 .update_remote_description(&states.track_id, &answer.to_string())
220 .await?;
221 }
222 DialogState::Confirmed(dialog_id, _) => {
223 info!(session_id=states.session_id, %dialog_id, "dialog confirmed");
224 states
225 .call_state
226 .as_ref()
227 .write()
228 .map(|mut cs| {
229 cs.session_id = dialog_id.to_string();
230 cs.answer_time.replace(Utc::now());
231 cs.last_status_code = 200;
232 })
233 .ok();
234 }
235 DialogState::Info(dialog_id, req, tx_handle) => {
236 let body_str = String::from_utf8_lossy(req.body());
237 info!(session_id=states.session_id, %dialog_id, body=%body_str, "dialog info received");
238 if body_str.starts_with("Signal=") {
239 let digit = body_str.trim_start_matches("Signal=").chars().next();
240 if let Some(digit) = digit {
241 states.event_sender.send(crate::event::SessionEvent::Dtmf {
242 track_id: states.track_id.clone(),
243 timestamp: crate::media::get_timestamp(),
244 digit: digit.to_string(),
245 })?;
246 }
247 }
248 tx_handle.reply(rsip::StatusCode::OK).await.ok();
249 }
250 DialogState::Updated(dialog_id, _req, tx_handle) => {
251 info!(session_id = states.session_id, %dialog_id, "dialog update received");
252 if let Some(sdp_body) = _req.body().get(..) {
253 let sdp_str = String::from_utf8_lossy(sdp_body);
254 info!(session_id=states.session_id, %dialog_id, "updating remote description:\n{}", sdp_str);
255 states
256 .media_stream
257 .update_remote_description(&states.track_id, &sdp_str.to_string())
258 .await?;
259 }
260 tx_handle.reply(rsip::StatusCode::OK).await.ok();
261 }
262 DialogState::Options(dialog_id, _req, tx_handle) => {
263 info!(session_id = states.session_id, %dialog_id, "dialog options received");
264 tx_handle.reply(rsip::StatusCode::OK).await.ok();
265 }
266 DialogState::Terminated(dialog_id, reason) => {
267 info!(
268 session_id = states.session_id,
269 ?dialog_id,
270 ?reason,
271 "dialog terminated"
272 );
273 states.terminated_reason = Some(reason.clone());
274 return Ok(());
275 }
276 other_state => {
277 info!(
278 session_id = states.session_id,
279 %other_state,
280 "dialog received other state"
281 );
282 }
283 }
284 }
285 Ok(())
286 }
287
288 pub(super) async fn process_dialog(&mut self, mut states: InviteDialogStates) {
289 let token = states.cancel_token.clone();
290 tokio::select! {
291 _ = token.cancelled() => {
292 states.terminated_reason = Some(TerminatedReason::UacCancel);
293 }
294 _ = self.dialog_event_loop(&mut states) => {}
295 };
296 self.drop_async().await;
297 }
298}
299
300#[derive(Clone)]
301pub struct Invitation {
302 pub dialog_layer: Arc<DialogLayer>,
303 pub pending_dialogs: Arc<std::sync::Mutex<HashMap<String, PendingDialog>>>,
304}
305
306impl Invitation {
307 pub fn new(dialog_layer: Arc<DialogLayer>) -> Self {
308 Self {
309 dialog_layer,
310 pending_dialogs: Arc::new(std::sync::Mutex::new(HashMap::new())),
311 }
312 }
313
314 pub fn add_pending(&self, session_id: String, pending: PendingDialog) {
315 self.pending_dialogs
316 .lock()
317 .map(|mut ps| ps.insert(session_id, pending))
318 .ok();
319 }
320
321 pub fn get_pending_call(&self, session_id: &String) -> Option<PendingDialog> {
322 self.pending_dialogs
323 .lock()
324 .ok()
325 .map(|mut ps| ps.remove(session_id))
326 .flatten()
327 }
328
329 pub fn has_pending_call(&self, session_id: &str) -> Option<DialogId> {
330 self.pending_dialogs
331 .lock()
332 .ok()
333 .map(|ps| ps.get(session_id).map(|d| d.dialog.id()))
334 .flatten()
335 }
336
337 pub async fn hangup(
338 &self,
339 dialog_id: DialogId,
340 code: Option<rsip::StatusCode>,
341 reason: Option<String>,
342 ) -> Result<()> {
343 if let Some(call) = self.get_pending_call(&dialog_id.to_string()) {
344 call.dialog.reject(code, reason).ok();
345 call.token.cancel();
346 }
347 match self.dialog_layer.get_dialog(&dialog_id) {
348 Some(dialog) => {
349 self.dialog_layer.remove_dialog(&dialog_id);
350 dialog.hangup().await.ok();
351 }
352 None => {}
353 }
354 Ok(())
355 }
356
357 pub async fn reject(&self, dialog_id: DialogId) -> Result<()> {
358 if let Some(call) = self.get_pending_call(&dialog_id.to_string()) {
359 call.dialog.reject(None, None).ok();
360 call.token.cancel();
361 }
362 match self.dialog_layer.get_dialog(&dialog_id) {
363 Some(dialog) => {
364 self.dialog_layer.remove_dialog(&dialog_id);
365 dialog.hangup().await.ok();
366 }
367 None => {}
368 }
369 Ok(())
370 }
371
372 pub async fn invite(
373 &self,
374 invite_option: InviteOption,
375 state_sender: DialogStateSender,
376 ) -> Result<(DialogId, Option<Vec<u8>>), rsipstack::Error> {
377 let (dialog, resp) = self
378 .dialog_layer
379 .do_invite(invite_option, state_sender)
380 .await?;
381
382 let offer = match resp {
383 Some(resp) => match resp.status_code.kind() {
384 rsip::StatusCodeKind::Successful => {
385 let offer = resp.body.clone();
386 Some(offer)
387 }
388 _ => {
389 let reason = resp
390 .reason_phrase()
391 .unwrap_or(&resp.status_code.to_string())
392 .to_string();
393 return Err(rsipstack::Error::DialogError(
394 reason,
395 dialog.id(),
396 resp.status_code,
397 ));
398 }
399 },
400 None => {
401 return Err(rsipstack::Error::DialogError(
402 "no response received".to_string(),
403 dialog.id(),
404 rsip::StatusCode::NotAcceptableHere,
405 ));
406 }
407 };
408 Ok((dialog.id(), offer))
409 }
410}