1use crate::call::active_call::ActiveCallStateRef;
2use crate::callrecord::CallRecordHangupReason;
3use crate::event::EventSender;
4use crate::media::TrackId;
5use crate::media::stream::MediaStream;
6use crate::useragent::invitation::PendingDialog;
7use anyhow::Result;
8use chrono::Utc;
9use rsipstack::dialog::DialogId;
10use rsipstack::dialog::dialog::{
11 Dialog, DialogState, DialogStateReceiver, DialogStateSender, TerminatedReason,
12};
13use rsipstack::dialog::dialog_layer::DialogLayer;
14use rsipstack::dialog::invitation::InviteOption;
15use rsipstack::rsip_ext::RsipResponseExt;
16use std::collections::HashMap;
17use std::sync::Arc;
18use tokio_util::sync::CancellationToken;
19use tracing::{info, warn};
20
21pub struct DialogStateReceiverGuard {
22 pub(super) dialog_layer: Arc<DialogLayer>,
23 pub(super) receiver: DialogStateReceiver,
24 pub(super) dialog_id: Option<DialogId>,
25}
26
27impl DialogStateReceiverGuard {
28 pub fn new(dialog_layer: Arc<DialogLayer>, receiver: DialogStateReceiver) -> Self {
29 Self {
30 dialog_layer,
31 receiver,
32 dialog_id: None,
33 }
34 }
35 pub async fn recv(&mut self) -> Option<DialogState> {
36 let state = self.receiver.recv().await;
37 if let Some(ref s) = state {
38 self.dialog_id = Some(s.id().clone());
39 }
40 state
41 }
42
43 fn take_dialog(&mut self) -> Option<Dialog> {
44 let id = match self.dialog_id.take() {
45 Some(id) => id,
46 None => return None,
47 };
48
49 match self.dialog_layer.get_dialog(&id) {
50 Some(dialog) => {
51 info!(%id, "dialog removed on drop");
52 self.dialog_layer.remove_dialog(&id);
53 return Some(dialog);
54 }
55 _ => {}
56 }
57 None
58 }
59
60 pub async fn drop_async(&mut self) {
61 if let Some(dialog) = self.take_dialog() {
62 if let Err(e) = dialog.hangup().await {
63 warn!(id=%dialog.id(), "error hanging up dialog on drop: {}", e);
64 }
65 }
66 }
67}
68
69impl Drop for DialogStateReceiverGuard {
70 fn drop(&mut self) {
71 if let Some(dialog) = self.take_dialog() {
72 crate::spawn(async move {
73 if let Err(e) = dialog.hangup().await {
74 warn!(id=%dialog.id(), "error hanging up dialog on drop: {}", e);
75 }
76 });
77 }
78 }
79}
80
81pub(super) struct InviteDialogStates {
82 pub is_client: bool,
83 pub session_id: String,
84 pub track_id: TrackId,
85 pub cancel_token: CancellationToken,
86 pub event_sender: EventSender,
87 pub call_state: ActiveCallStateRef,
88 pub media_stream: Arc<MediaStream>,
89 pub terminated_reason: Option<TerminatedReason>,
90 pub has_early_media: bool,
91}
92
93impl InviteDialogStates {
94 pub(super) fn on_terminated(&mut self) {
95 let mut call_state_ref = match self.call_state.try_write() {
96 Ok(cs) => cs,
97 Err(_) => {
98 return;
99 }
100 };
101 let reason = &self.terminated_reason;
102 call_state_ref.last_status_code = match reason {
103 Some(TerminatedReason::UacCancel) => 487,
104 Some(TerminatedReason::UacBye) => 200,
105 Some(TerminatedReason::UacBusy) => 486,
106 Some(TerminatedReason::UasBye) => 200,
107 Some(TerminatedReason::UasBusy) => 486,
108 Some(TerminatedReason::UasDecline) => 603,
109 Some(TerminatedReason::UacOther(code)) => code.code(),
110 Some(TerminatedReason::UasOther(code)) => code.code(),
111 _ => 500, };
113
114 if call_state_ref.hangup_reason.is_none() {
115 call_state_ref.hangup_reason.replace(match reason {
116 Some(TerminatedReason::UacCancel) => CallRecordHangupReason::Canceled,
117 Some(TerminatedReason::UacBye) | Some(TerminatedReason::UacBusy) => {
118 CallRecordHangupReason::ByCaller
119 }
120 Some(TerminatedReason::UasBye) | Some(TerminatedReason::UasBusy) => {
121 CallRecordHangupReason::ByCallee
122 }
123 Some(TerminatedReason::UasDecline) => CallRecordHangupReason::ByCallee,
124 Some(TerminatedReason::UacOther(_)) => CallRecordHangupReason::ByCaller,
125 Some(TerminatedReason::UasOther(_)) => CallRecordHangupReason::ByCallee,
126 _ => CallRecordHangupReason::BySystem,
127 });
128 };
129 let initiator = match reason {
130 Some(TerminatedReason::UacCancel) => "caller".to_string(),
131 Some(TerminatedReason::UacBye) | Some(TerminatedReason::UacBusy) => {
132 "caller".to_string()
133 }
134 Some(TerminatedReason::UasBye)
135 | Some(TerminatedReason::UasBusy)
136 | Some(TerminatedReason::UasDecline) => "callee".to_string(),
137 _ => "system".to_string(),
138 };
139 self.event_sender
140 .send(crate::event::SessionEvent::TrackEnd {
141 track_id: self.track_id.clone(),
142 timestamp: crate::media::get_timestamp(),
143 duration: call_state_ref
144 .answer_time
145 .map(|t| (Utc::now() - t).num_milliseconds())
146 .unwrap_or_default() as u64,
147 ssrc: call_state_ref.ssrc,
148 play_id: None,
149 })
150 .ok();
151 let hangup_event =
152 call_state_ref.build_hangup_event(self.track_id.clone(), Some(initiator));
153 self.event_sender.send(hangup_event).ok();
154 }
155}
156
157impl Drop for InviteDialogStates {
158 fn drop(&mut self) {
159 self.on_terminated();
160 self.cancel_token.cancel();
161 }
162}
163
164impl DialogStateReceiverGuard {
165 pub(self) async fn dialog_event_loop(&mut self, states: &mut InviteDialogStates) -> Result<()> {
166 while let Some(event) = self.recv().await {
167 match event {
168 DialogState::Calling(dialog_id) => {
169 info!(session_id=states.session_id, %dialog_id, "dialog calling");
170 states.call_state.write().await.session_id = dialog_id.to_string();
171 }
172 DialogState::Trying(_) => {}
173 DialogState::Early(dialog_id, resp) => {
174 let code = resp.status_code.code();
175 let body = resp.body();
176 let answer = String::from_utf8_lossy(body);
177 let has_sdp = !answer.is_empty();
178 info!(session_id=states.session_id, %dialog_id, has_sdp=%has_sdp, "dialog early ({}): \n{}", code, answer);
179
180 {
181 let mut cs = states.call_state.write().await;
182 if cs.ring_time.is_none() {
183 cs.ring_time.replace(Utc::now());
184 }
185 cs.last_status_code = code;
186 }
187
188 if !states.is_client {
189 continue;
190 }
191
192 let refer = states.call_state.read().await.is_refer;
193
194 states
195 .event_sender
196 .send(crate::event::SessionEvent::Ringing {
197 track_id: states.track_id.clone(),
198 timestamp: crate::media::get_timestamp(),
199 early_media: has_sdp,
200 refer: Some(refer),
201 })?;
202
203 if has_sdp {
204 states.has_early_media = true;
205 states
206 .media_stream
207 .update_remote_description(&states.track_id, &answer.to_string())
208 .await?;
209 }
210 }
211 DialogState::Confirmed(dialog_id, msg) => {
212 info!(session_id=states.session_id, %dialog_id, has_early_media=%states.has_early_media, "dialog confirmed");
213 {
214 let mut cs = states.call_state.write().await;
215 cs.session_id = dialog_id.to_string();
216 cs.answer_time.replace(Utc::now());
217 cs.last_status_code = 200;
218 }
219 if states.is_client {
220 let answer = String::from_utf8_lossy(msg.body());
221 let answer = answer.trim();
222 if !answer.is_empty() {
223 if states.has_early_media {
224 info!(
225 session_id = states.session_id,
226 "updating remote description with final answer after early media (force=true)"
227 );
228 if let Err(e) = states
231 .media_stream
232 .update_remote_description_force(
233 &states.track_id,
234 &answer.to_string(),
235 )
236 .await
237 {
238 tracing::warn!(
239 session_id = states.session_id,
240 "failed to force update remote description on confirmed: {}",
241 e
242 );
243 }
244 } else {
245 if let Err(e) = states
246 .media_stream
247 .update_remote_description(
248 &states.track_id,
249 &answer.to_string(),
250 )
251 .await
252 {
253 tracing::warn!(
254 session_id = states.session_id,
255 "failed to update remote description on confirmed: {}",
256 e
257 );
258 }
259 }
260 }
261 }
262 }
263 DialogState::Info(dialog_id, req, tx_handle) => {
264 let body_str = String::from_utf8_lossy(req.body());
265 info!(session_id=states.session_id, %dialog_id, body=%body_str, "dialog info received");
266 if body_str.starts_with("Signal=") {
267 let digit = body_str.trim_start_matches("Signal=").chars().next();
268 if let Some(digit) = digit {
269 states.event_sender.send(crate::event::SessionEvent::Dtmf {
270 track_id: states.track_id.clone(),
271 timestamp: crate::media::get_timestamp(),
272 digit: digit.to_string(),
273 })?;
274 }
275 }
276 tx_handle.reply(rsip::StatusCode::OK).await.ok();
277 }
278 DialogState::Updated(dialog_id, _req, tx_handle) => {
279 info!(session_id = states.session_id, %dialog_id, "dialog update received");
280 let mut answer_sdp = None;
281 if let Some(sdp_body) = _req.body().get(..) {
282 let sdp_str = String::from_utf8_lossy(sdp_body);
283 if !sdp_str.is_empty()
284 && (_req.method == rsip::Method::Invite
285 || _req.method == rsip::Method::Update)
286 {
287 info!(session_id=states.session_id, %dialog_id, method=%_req.method, "handling re-invite/update offer");
288 match states
289 .media_stream
290 .handshake(&states.track_id, sdp_str.to_string(), None)
291 .await
292 {
293 Ok(sdp) => answer_sdp = Some(sdp),
294 Err(e) => {
295 warn!(
296 session_id = states.session_id,
297 "failed to handle re-invite: {}", e
298 );
299 }
300 }
301 } else {
302 info!(session_id=states.session_id, %dialog_id, "updating remote description:\n{}", sdp_str);
303 states
304 .media_stream
305 .update_remote_description(&states.track_id, &sdp_str.to_string())
306 .await?;
307 }
308 }
309
310 if let Some(sdp) = answer_sdp {
311 tx_handle
312 .respond(
313 rsip::StatusCode::OK,
314 Some(vec![rsip::Header::ContentType(
315 "application/sdp".to_string().into(),
316 )]),
317 Some(sdp.into()),
318 )
319 .await
320 .ok();
321 } else {
322 tx_handle.reply(rsip::StatusCode::OK).await.ok();
323 }
324 }
325 DialogState::Options(dialog_id, _req, tx_handle) => {
326 info!(session_id = states.session_id, %dialog_id, "dialog options received");
327 tx_handle.reply(rsip::StatusCode::OK).await.ok();
328 }
329 DialogState::Terminated(dialog_id, reason) => {
330 info!(
331 session_id = states.session_id,
332 ?dialog_id,
333 ?reason,
334 "dialog terminated"
335 );
336 states.terminated_reason = Some(reason.clone());
337 return Ok(());
338 }
339 other_state => {
340 info!(
341 session_id = states.session_id,
342 %other_state,
343 "dialog received other state"
344 );
345 }
346 }
347 }
348 Ok(())
349 }
350
351 pub(super) async fn process_dialog(&mut self, mut states: InviteDialogStates) {
352 let token = states.cancel_token.clone();
353 tokio::select! {
354 _ = token.cancelled() => {
355 states.terminated_reason = Some(TerminatedReason::UacCancel);
356 }
357 _ = self.dialog_event_loop(&mut states) => {}
358 };
359 self.drop_async().await;
360 }
361}
362
363#[derive(Clone)]
364pub struct Invitation {
365 pub dialog_layer: Arc<DialogLayer>,
366 pub pending_dialogs: Arc<std::sync::Mutex<HashMap<DialogId, PendingDialog>>>,
367}
368
369impl Invitation {
370 pub fn new(dialog_layer: Arc<DialogLayer>) -> Self {
371 Self {
372 dialog_layer,
373 pending_dialogs: Arc::new(std::sync::Mutex::new(HashMap::new())),
374 }
375 }
376
377 pub fn add_pending(&self, dialog_id: DialogId, pending: PendingDialog) {
378 self.pending_dialogs
379 .lock()
380 .map(|mut ps| ps.insert(dialog_id, pending))
381 .ok();
382 }
383
384 pub fn get_pending_call(&self, dialog_id: &DialogId) -> Option<PendingDialog> {
385 self.pending_dialogs
386 .lock()
387 .ok()
388 .and_then(|mut ps| ps.remove(dialog_id))
389 }
390
391 pub fn has_pending_call(&self, dialog_id: &DialogId) -> bool {
392 self.pending_dialogs
393 .lock()
394 .ok()
395 .map(|ps| ps.contains_key(dialog_id))
396 .unwrap_or(false)
397 }
398
399 pub fn find_dialog_id_by_session_id(&self, session_id: &str) -> Option<DialogId> {
400 self.pending_dialogs.lock().ok().and_then(|ps| {
401 ps.iter()
402 .find(|(id, _)| id.to_string() == session_id)
403 .map(|(id, _)| id.clone())
404 })
405 }
406
407 pub async fn hangup(
408 &self,
409 dialog_id: DialogId,
410 code: Option<rsip::StatusCode>,
411 reason: Option<String>,
412 ) -> Result<()> {
413 if let Some(call) = self.get_pending_call(&dialog_id) {
414 call.dialog.reject(code, reason).ok();
415 call.token.cancel();
416 }
417 match self.dialog_layer.get_dialog(&dialog_id) {
418 Some(dialog) => {
419 self.dialog_layer.remove_dialog(&dialog_id);
420 dialog.hangup().await.ok();
421 }
422 None => {}
423 }
424 Ok(())
425 }
426
427 pub async fn reject(&self, dialog_id: DialogId) -> Result<()> {
428 if let Some(call) = self.get_pending_call(&dialog_id) {
429 call.dialog.reject(None, None).ok();
430 call.token.cancel();
431 }
432 match self.dialog_layer.get_dialog(&dialog_id) {
433 Some(dialog) => {
434 self.dialog_layer.remove_dialog(&dialog_id);
435 dialog.hangup().await.ok();
436 }
437 None => {}
438 }
439 Ok(())
440 }
441
442 pub async fn invite(
443 &self,
444 invite_option: InviteOption,
445 state_sender: DialogStateSender,
446 ) -> Result<(DialogId, Option<Vec<u8>>), rsipstack::Error> {
447 let (dialog, resp) = self
448 .dialog_layer
449 .do_invite(invite_option, state_sender)
450 .await?;
451
452 let offer = match resp {
453 Some(resp) => match resp.status_code.kind() {
454 rsip::StatusCodeKind::Successful => {
455 let offer = resp.body.clone();
456 Some(offer)
457 }
458 _ => {
459 let reason = resp
460 .reason_phrase()
461 .unwrap_or(&resp.status_code.to_string())
462 .to_string();
463 return Err(rsipstack::Error::DialogError(
464 reason,
465 dialog.id(),
466 resp.status_code,
467 ));
468 }
469 },
470 None => {
471 return Err(rsipstack::Error::DialogError(
472 "no response received".to_string(),
473 dialog.id(),
474 rsip::StatusCode::NotAcceptableHere,
475 ));
476 }
477 };
478 Ok((dialog.id(), offer))
479 }
480}