1use crate::call::active_call::ActiveCallStateRef;
2use crate::callrecord::CallRecordHangupReason;
3use crate::event::EventSender;
4use crate::media::TrackId;
5use crate::media::stream::MediaStream;
6use crate::useragent::invitation::PendingDialog;
7use anyhow::Result;
8use chrono::Utc;
9use rsipstack::dialog::DialogId;
10use rsipstack::dialog::dialog::{
11 Dialog, DialogState, DialogStateReceiver, DialogStateSender, TerminatedReason,
12};
13use rsipstack::dialog::dialog_layer::DialogLayer;
14use rsipstack::dialog::invitation::InviteOption;
15use rsipstack::rsip_ext::RsipResponseExt;
16use std::collections::HashMap;
17use std::sync::Arc;
18use tokio_util::sync::CancellationToken;
19use tracing::{info, warn};
20
21pub struct DialogStateReceiverGuard {
22 pub(super) dialog_layer: Arc<DialogLayer>,
23 pub(super) receiver: DialogStateReceiver,
24 pub(super) dialog_id: Option<DialogId>,
25}
26
27impl DialogStateReceiverGuard {
28 pub fn new(dialog_layer: Arc<DialogLayer>, receiver: DialogStateReceiver) -> Self {
29 Self {
30 dialog_layer,
31 receiver,
32 dialog_id: None,
33 }
34 }
35 pub async fn recv(&mut self) -> Option<DialogState> {
36 let state = self.receiver.recv().await;
37 if let Some(ref s) = state {
38 self.dialog_id = Some(s.id().clone());
39 }
40 state
41 }
42
43 fn take_dialog(&mut self) -> Option<Dialog> {
44 let id = match self.dialog_id.take() {
45 Some(id) => id,
46 None => return None,
47 };
48
49 match self.dialog_layer.get_dialog(&id) {
50 Some(dialog) => {
51 info!(%id, "dialog removed on drop");
52 self.dialog_layer.remove_dialog(&id);
53 return Some(dialog);
54 }
55 _ => {}
56 }
57 None
58 }
59
60 pub async fn drop_async(&mut self) {
61 if let Some(dialog) = self.take_dialog() {
62 if let Err(e) = dialog.hangup().await {
63 warn!(id=%dialog.id(), "error hanging up dialog on drop: {}", e);
64 }
65 }
66 }
67}
68
69impl Drop for DialogStateReceiverGuard {
70 fn drop(&mut self) {
71 if let Some(dialog) = self.take_dialog() {
72 crate::spawn(async move {
73 if let Err(e) = dialog.hangup().await {
74 warn!(id=%dialog.id(), "error hanging up dialog on drop: {}", e);
75 }
76 });
77 }
78 }
79}
80
81pub(super) struct InviteDialogStates {
82 pub is_client: bool,
83 pub session_id: String,
84 pub track_id: TrackId,
85 pub cancel_token: CancellationToken,
86 pub event_sender: EventSender,
87 pub call_state: ActiveCallStateRef,
88 pub media_stream: Arc<MediaStream>,
89 pub terminated_reason: Option<TerminatedReason>,
90 pub has_early_media: bool,
91}
92
93impl InviteDialogStates {
94 pub(super) fn on_terminated(&mut self) {
95 let mut call_state_ref = match self.call_state.try_write() {
96 Ok(cs) => cs,
97 Err(_) => {
98 return;
99 }
100 };
101 let reason = &self.terminated_reason;
102 call_state_ref.last_status_code = match reason {
103 Some(TerminatedReason::UacCancel) => 487,
104 Some(TerminatedReason::UacBye) => 200,
105 Some(TerminatedReason::UacBusy) => 486,
106 Some(TerminatedReason::UasBye) => 200,
107 Some(TerminatedReason::UasBusy) => 486,
108 Some(TerminatedReason::UasDecline) => 603,
109 Some(TerminatedReason::UacOther(code)) => code.code(),
110 Some(TerminatedReason::UasOther(code)) => code.code(),
111 _ => 500, };
113
114 if call_state_ref.hangup_reason.is_none() {
115 call_state_ref.hangup_reason.replace(match reason {
116 Some(TerminatedReason::UacCancel) => CallRecordHangupReason::Canceled,
117 Some(TerminatedReason::UacBye) | Some(TerminatedReason::UacBusy) => {
118 CallRecordHangupReason::ByCaller
119 }
120 Some(TerminatedReason::UasBye) | Some(TerminatedReason::UasBusy) => {
121 CallRecordHangupReason::ByCallee
122 }
123 Some(TerminatedReason::UasDecline) => CallRecordHangupReason::ByCallee,
124 Some(TerminatedReason::UacOther(_)) => CallRecordHangupReason::ByCaller,
125 Some(TerminatedReason::UasOther(_)) => CallRecordHangupReason::ByCallee,
126 _ => CallRecordHangupReason::BySystem,
127 });
128 };
129 let initiator = match reason {
130 Some(TerminatedReason::UacCancel) => "caller".to_string(),
131 Some(TerminatedReason::UacBye) | Some(TerminatedReason::UacBusy) => {
132 "caller".to_string()
133 }
134 Some(TerminatedReason::UasBye)
135 | Some(TerminatedReason::UasBusy)
136 | Some(TerminatedReason::UasDecline) => "callee".to_string(),
137 _ => "system".to_string(),
138 };
139 self.event_sender
140 .send(crate::event::SessionEvent::TrackEnd {
141 track_id: self.track_id.clone(),
142 timestamp: crate::media::get_timestamp(),
143 duration: call_state_ref
144 .answer_time
145 .map(|t| (Utc::now() - t).num_milliseconds())
146 .unwrap_or_default() as u64,
147 ssrc: call_state_ref.ssrc,
148 play_id: None,
149 })
150 .ok();
151 let hangup_event =
152 call_state_ref.build_hangup_event(self.track_id.clone(), Some(initiator));
153 self.event_sender.send(hangup_event).ok();
154 }
155}
156
157impl Drop for InviteDialogStates {
158 fn drop(&mut self) {
159 self.on_terminated();
160 self.cancel_token.cancel();
161 }
162}
163
164impl DialogStateReceiverGuard {
165 pub(self) async fn dialog_event_loop(&mut self, states: &mut InviteDialogStates) -> Result<()> {
166 while let Some(event) = self.recv().await {
167 match event {
168 DialogState::Calling(dialog_id) => {
169 info!(session_id=states.session_id, %dialog_id, "dialog calling");
170 states.call_state.write().await.session_id = dialog_id.to_string();
171 }
172 DialogState::Trying(_) => {}
173 DialogState::Early(dialog_id, resp) => {
174 let code = resp.status_code.code();
175 let body = resp.body();
176 let answer = String::from_utf8_lossy(body);
177 let has_sdp = !answer.is_empty();
178 info!(session_id=states.session_id, %dialog_id, has_sdp=%has_sdp, "dialog early ({}): \n{}", code, answer);
179
180 {
181 let mut cs = states.call_state.write().await;
182 if cs.ring_time.is_none() {
183 cs.ring_time.replace(Utc::now());
184 }
185 cs.last_status_code = code;
186 }
187
188 if !states.is_client {
189 continue;
190 }
191
192 let refer = states.call_state.read().await.is_refer;
193
194 states
195 .event_sender
196 .send(crate::event::SessionEvent::Ringing {
197 track_id: states.track_id.clone(),
198 timestamp: crate::media::get_timestamp(),
199 early_media: has_sdp,
200 refer: Some(refer),
201 })?;
202
203 if has_sdp {
204 states.has_early_media = true;
205 states
206 .media_stream
207 .update_remote_description(&states.track_id, &answer.to_string())
208 .await?;
209 }
210 }
211 DialogState::Confirmed(dialog_id, msg) => {
212 info!(session_id=states.session_id, %dialog_id, has_early_media=%states.has_early_media, "dialog confirmed");
213 {
214 let mut cs = states.call_state.write().await;
215 cs.session_id = dialog_id.to_string();
216 cs.answer_time.replace(Utc::now());
217 cs.last_status_code = 200;
218 }
219 if states.is_client {
220 let answer = String::from_utf8_lossy(msg.body());
221 let answer = answer.trim();
222 if !answer.is_empty() {
223 if states.has_early_media {
224 info!(
225 session_id = states.session_id,
226 "updating remote description with final answer after early media (force=true)"
227 );
228 if let Err(e) = states
231 .media_stream
232 .update_remote_description_force(
233 &states.track_id,
234 &answer.to_string(),
235 )
236 .await
237 {
238 tracing::warn!(
239 session_id = states.session_id,
240 "failed to force update remote description on confirmed: {}",
241 e
242 );
243 }
244 } else {
245 if let Err(e) = states
246 .media_stream
247 .update_remote_description(
248 &states.track_id,
249 &answer.to_string(),
250 )
251 .await
252 {
253 tracing::warn!(
254 session_id = states.session_id,
255 "failed to update remote description on confirmed: {}",
256 e
257 );
258 }
259 }
260 }
261 }
262 }
263 DialogState::Info(dialog_id, req, tx_handle) => {
264 let body_str = String::from_utf8_lossy(req.body());
265 info!(session_id=states.session_id, %dialog_id, body=%body_str, "dialog info received");
266 if body_str.starts_with("Signal=") {
267 let digit = body_str.trim_start_matches("Signal=").chars().next();
268 if let Some(digit) = digit {
269 states.event_sender.send(crate::event::SessionEvent::Dtmf {
270 track_id: states.track_id.clone(),
271 timestamp: crate::media::get_timestamp(),
272 digit: digit.to_string(),
273 })?;
274 }
275 }
276 tx_handle.reply(rsip::StatusCode::OK).await.ok();
277 }
278 DialogState::Updated(dialog_id, _req, tx_handle) => {
279 info!(session_id = states.session_id, %dialog_id, "dialog update received");
280 if let Some(sdp_body) = _req.body().get(..) {
281 let sdp_str = String::from_utf8_lossy(sdp_body);
282 info!(session_id=states.session_id, %dialog_id, "updating remote description:\n{}", sdp_str);
283 states
284 .media_stream
285 .update_remote_description(&states.track_id, &sdp_str.to_string())
286 .await?;
287 }
288 tx_handle.reply(rsip::StatusCode::OK).await.ok();
289 }
290 DialogState::Options(dialog_id, _req, tx_handle) => {
291 info!(session_id = states.session_id, %dialog_id, "dialog options received");
292 tx_handle.reply(rsip::StatusCode::OK).await.ok();
293 }
294 DialogState::Terminated(dialog_id, reason) => {
295 info!(
296 session_id = states.session_id,
297 ?dialog_id,
298 ?reason,
299 "dialog terminated"
300 );
301 states.terminated_reason = Some(reason.clone());
302 return Ok(());
303 }
304 other_state => {
305 info!(
306 session_id = states.session_id,
307 %other_state,
308 "dialog received other state"
309 );
310 }
311 }
312 }
313 Ok(())
314 }
315
316 pub(super) async fn process_dialog(&mut self, mut states: InviteDialogStates) {
317 let token = states.cancel_token.clone();
318 tokio::select! {
319 _ = token.cancelled() => {
320 states.terminated_reason = Some(TerminatedReason::UacCancel);
321 }
322 _ = self.dialog_event_loop(&mut states) => {}
323 };
324 self.drop_async().await;
325 }
326}
327
328#[derive(Clone)]
329pub struct Invitation {
330 pub dialog_layer: Arc<DialogLayer>,
331 pub pending_dialogs: Arc<std::sync::Mutex<HashMap<DialogId, PendingDialog>>>,
332}
333
334impl Invitation {
335 pub fn new(dialog_layer: Arc<DialogLayer>) -> Self {
336 Self {
337 dialog_layer,
338 pending_dialogs: Arc::new(std::sync::Mutex::new(HashMap::new())),
339 }
340 }
341
342 pub fn add_pending(&self, dialog_id: DialogId, pending: PendingDialog) {
343 self.pending_dialogs
344 .lock()
345 .map(|mut ps| ps.insert(dialog_id, pending))
346 .ok();
347 }
348
349 pub fn get_pending_call(&self, dialog_id: &DialogId) -> Option<PendingDialog> {
350 self.pending_dialogs
351 .lock()
352 .ok()
353 .and_then(|mut ps| ps.remove(dialog_id))
354 }
355
356 pub fn has_pending_call(&self, dialog_id: &DialogId) -> bool {
357 self.pending_dialogs
358 .lock()
359 .ok()
360 .map(|ps| ps.contains_key(dialog_id))
361 .unwrap_or(false)
362 }
363
364 pub fn find_dialog_id_by_session_id(&self, session_id: &str) -> Option<DialogId> {
365 self.pending_dialogs.lock().ok().and_then(|ps| {
366 ps.iter()
367 .find(|(id, _)| id.to_string() == session_id)
368 .map(|(id, _)| id.clone())
369 })
370 }
371
372 pub async fn hangup(
373 &self,
374 dialog_id: DialogId,
375 code: Option<rsip::StatusCode>,
376 reason: Option<String>,
377 ) -> Result<()> {
378 if let Some(call) = self.get_pending_call(&dialog_id) {
379 call.dialog.reject(code, reason).ok();
380 call.token.cancel();
381 }
382 match self.dialog_layer.get_dialog(&dialog_id) {
383 Some(dialog) => {
384 self.dialog_layer.remove_dialog(&dialog_id);
385 dialog.hangup().await.ok();
386 }
387 None => {}
388 }
389 Ok(())
390 }
391
392 pub async fn reject(&self, dialog_id: DialogId) -> Result<()> {
393 if let Some(call) = self.get_pending_call(&dialog_id) {
394 call.dialog.reject(None, None).ok();
395 call.token.cancel();
396 }
397 match self.dialog_layer.get_dialog(&dialog_id) {
398 Some(dialog) => {
399 self.dialog_layer.remove_dialog(&dialog_id);
400 dialog.hangup().await.ok();
401 }
402 None => {}
403 }
404 Ok(())
405 }
406
407 pub async fn invite(
408 &self,
409 invite_option: InviteOption,
410 state_sender: DialogStateSender,
411 ) -> Result<(DialogId, Option<Vec<u8>>), rsipstack::Error> {
412 let (dialog, resp) = self
413 .dialog_layer
414 .do_invite(invite_option, state_sender)
415 .await?;
416
417 let offer = match resp {
418 Some(resp) => match resp.status_code.kind() {
419 rsip::StatusCodeKind::Successful => {
420 let offer = resp.body.clone();
421 Some(offer)
422 }
423 _ => {
424 let reason = resp
425 .reason_phrase()
426 .unwrap_or(&resp.status_code.to_string())
427 .to_string();
428 return Err(rsipstack::Error::DialogError(
429 reason,
430 dialog.id(),
431 resp.status_code,
432 ));
433 }
434 },
435 None => {
436 return Err(rsipstack::Error::DialogError(
437 "no response received".to_string(),
438 dialog.id(),
439 rsip::StatusCode::NotAcceptableHere,
440 ));
441 }
442 };
443 Ok((dialog.id(), offer))
444 }
445}