1use crate::call::active_call::ActiveCallStateRef;
2use crate::callrecord::CallRecordHangupReason;
3use crate::event::EventSender;
4use crate::media::TrackId;
5use crate::media::stream::MediaStream;
6use crate::useragent::invitation::PendingDialog;
7use anyhow::Result;
8use chrono::Utc;
9use rsipstack::dialog::DialogId;
10use rsipstack::dialog::dialog::{
11 Dialog, DialogState, DialogStateReceiver, DialogStateSender, TerminatedReason,
12};
13use rsipstack::dialog::dialog_layer::DialogLayer;
14use rsipstack::dialog::invitation::InviteOption;
15use rsipstack::rsip_ext::RsipResponseExt;
16use std::collections::HashMap;
17use std::sync::Arc;
18use tokio_util::sync::CancellationToken;
19use tracing::{info, warn};
20
21pub struct DialogStateReceiverGuard {
22 pub(super) dialog_layer: Arc<DialogLayer>,
23 pub(super) receiver: DialogStateReceiver,
24 pub(super) dialog_id: Option<DialogId>,
25 pub(super) hangup_headers: Option<Vec<rsip::Header>>,
26}
27
28impl DialogStateReceiverGuard {
29 pub fn new(
30 dialog_layer: Arc<DialogLayer>,
31 receiver: DialogStateReceiver,
32 hangup_headers: Option<Vec<rsip::Header>>,
33 ) -> Self {
34 Self {
35 dialog_layer,
36 receiver,
37 dialog_id: None,
38 hangup_headers,
39 }
40 }
41 pub async fn recv(&mut self) -> Option<DialogState> {
42 let state = self.receiver.recv().await;
43 if let Some(ref s) = state {
44 self.dialog_id = Some(s.id().clone());
45 }
46 state
47 }
48
49 fn take_dialog(&mut self) -> Option<Dialog> {
50 let id = match self.dialog_id.take() {
51 Some(id) => id,
52 None => return None,
53 };
54
55 match self.dialog_layer.get_dialog(&id) {
56 Some(dialog) => {
57 info!(%id, "dialog removed on drop");
58 self.dialog_layer.remove_dialog(&id);
59 return Some(dialog);
60 }
61 _ => {}
62 }
63 None
64 }
65
66 pub async fn drop_async(&mut self) {
67 if let Some(dialog) = self.take_dialog() {
68 if let Err(e) = dialog.hangup_with_headers(self.hangup_headers.take()).await {
69 warn!(id=%dialog.id(), "error hanging up dialog on drop: {}", e);
70 }
71 }
72 }
73}
74
75impl Drop for DialogStateReceiverGuard {
76 fn drop(&mut self) {
77 if let Some(dialog) = self.take_dialog() {
78 crate::spawn(async move {
79 if let Err(e) = dialog.hangup().await {
80 warn!(id=%dialog.id(), "error hanging up dialog on drop: {}", e);
81 }
82 });
83 }
84 }
85}
86
87pub(super) struct InviteDialogStates {
88 pub is_client: bool,
89 pub session_id: String,
90 pub track_id: TrackId,
91 pub cancel_token: CancellationToken,
92 pub event_sender: EventSender,
93 pub call_state: ActiveCallStateRef,
94 pub media_stream: Arc<MediaStream>,
95 pub terminated_reason: Option<TerminatedReason>,
96 pub has_early_media: bool,
97}
98
99impl InviteDialogStates {
100 pub(super) fn on_terminated(&mut self) {
101 let mut call_state_ref = match self.call_state.try_write() {
102 Ok(cs) => cs,
103 Err(_) => {
104 return;
105 }
106 };
107 let reason = &self.terminated_reason;
108 call_state_ref.last_status_code = match reason {
109 Some(TerminatedReason::UacCancel) => 487,
110 Some(TerminatedReason::UacBye) => 200,
111 Some(TerminatedReason::UacBusy) => 486,
112 Some(TerminatedReason::UasBye) => 200,
113 Some(TerminatedReason::UasBusy) => 486,
114 Some(TerminatedReason::UasDecline) => 603,
115 Some(TerminatedReason::UacOther(code)) => code.code(),
116 Some(TerminatedReason::UasOther(code)) => code.code(),
117 _ => 500, };
119
120 if call_state_ref.hangup_reason.is_none() {
121 call_state_ref.hangup_reason.replace(match reason {
122 Some(TerminatedReason::UacCancel) => CallRecordHangupReason::Canceled,
123 Some(TerminatedReason::UacBye) | Some(TerminatedReason::UacBusy) => {
124 CallRecordHangupReason::ByCaller
125 }
126 Some(TerminatedReason::UasBye) | Some(TerminatedReason::UasBusy) => {
127 CallRecordHangupReason::ByCallee
128 }
129 Some(TerminatedReason::UasDecline) => CallRecordHangupReason::ByCallee,
130 Some(TerminatedReason::UacOther(_)) => CallRecordHangupReason::ByCaller,
131 Some(TerminatedReason::UasOther(_)) => CallRecordHangupReason::ByCallee,
132 _ => CallRecordHangupReason::BySystem,
133 });
134 };
135 let initiator = match reason {
136 Some(TerminatedReason::UacCancel) => "caller".to_string(),
137 Some(TerminatedReason::UacBye) | Some(TerminatedReason::UacBusy) => {
138 "caller".to_string()
139 }
140 Some(TerminatedReason::UasBye)
141 | Some(TerminatedReason::UasBusy)
142 | Some(TerminatedReason::UasDecline) => "callee".to_string(),
143 _ => "system".to_string(),
144 };
145 self.event_sender
146 .send(crate::event::SessionEvent::TrackEnd {
147 track_id: self.track_id.clone(),
148 timestamp: crate::media::get_timestamp(),
149 duration: call_state_ref
150 .answer_time
151 .map(|t| (Utc::now() - t).num_milliseconds())
152 .unwrap_or_default() as u64,
153 ssrc: call_state_ref.ssrc,
154 play_id: None,
155 })
156 .ok();
157 let hangup_event =
158 call_state_ref.build_hangup_event(self.track_id.clone(), Some(initiator));
159 self.event_sender.send(hangup_event).ok();
160 }
161}
162
163impl Drop for InviteDialogStates {
164 fn drop(&mut self) {
165 self.on_terminated();
166 self.cancel_token.cancel();
167 }
168}
169
170impl DialogStateReceiverGuard {
171 pub(self) async fn dialog_event_loop(&mut self, states: &mut InviteDialogStates) -> Result<()> {
172 while let Some(event) = self.recv().await {
173 match event {
174 DialogState::Calling(dialog_id) => {
175 info!(session_id=states.session_id, %dialog_id, "dialog calling");
176 states.call_state.write().await.session_id = dialog_id.to_string();
177 }
178 DialogState::Trying(_) => {}
179 DialogState::Early(dialog_id, resp) => {
180 let code = resp.status_code.code();
181 let body = resp.body();
182 let answer = String::from_utf8_lossy(body);
183 let has_sdp = !answer.is_empty();
184 info!(session_id=states.session_id, %dialog_id, has_sdp=%has_sdp, "dialog early ({}): \n{}", code, answer);
185
186 {
187 let mut cs = states.call_state.write().await;
188 if cs.ring_time.is_none() {
189 cs.ring_time.replace(Utc::now());
190 }
191 cs.last_status_code = code;
192 }
193
194 if !states.is_client {
195 continue;
196 }
197
198 let refer = states.call_state.read().await.is_refer;
199
200 states
201 .event_sender
202 .send(crate::event::SessionEvent::Ringing {
203 track_id: states.track_id.clone(),
204 timestamp: crate::media::get_timestamp(),
205 early_media: has_sdp,
206 refer: Some(refer),
207 })?;
208
209 if has_sdp {
210 states.has_early_media = true;
211 states
212 .media_stream
213 .update_remote_description(&states.track_id, &answer.to_string())
214 .await?;
215 }
216 }
217 DialogState::Confirmed(dialog_id, msg) => {
218 info!(session_id=states.session_id, %dialog_id, has_early_media=%states.has_early_media, "dialog confirmed");
219 {
220 let mut cs = states.call_state.write().await;
221 cs.session_id = dialog_id.to_string();
222 cs.answer_time.replace(Utc::now());
223 cs.last_status_code = 200;
224 }
225 if states.is_client {
226 let answer = String::from_utf8_lossy(msg.body());
227 let answer = answer.trim();
228 if !answer.is_empty() {
229 if states.has_early_media {
230 info!(
231 session_id = states.session_id,
232 "updating remote description with final answer after early media (force=true)"
233 );
234 if let Err(e) = states
237 .media_stream
238 .update_remote_description_force(
239 &states.track_id,
240 &answer.to_string(),
241 )
242 .await
243 {
244 tracing::warn!(
245 session_id = states.session_id,
246 "failed to force update remote description on confirmed: {}",
247 e
248 );
249 }
250 } else {
251 if let Err(e) = states
252 .media_stream
253 .update_remote_description(
254 &states.track_id,
255 &answer.to_string(),
256 )
257 .await
258 {
259 tracing::warn!(
260 session_id = states.session_id,
261 "failed to update remote description on confirmed: {}",
262 e
263 );
264 }
265 }
266 }
267 }
268 }
269 DialogState::Info(dialog_id, req, tx_handle) => {
270 let body_str = String::from_utf8_lossy(req.body());
271 info!(session_id=states.session_id, %dialog_id, body=%body_str, "dialog info received");
272 if body_str.starts_with("Signal=") {
273 let digit = body_str.trim_start_matches("Signal=").chars().next();
274 if let Some(digit) = digit {
275 states.event_sender.send(crate::event::SessionEvent::Dtmf {
276 track_id: states.track_id.clone(),
277 timestamp: crate::media::get_timestamp(),
278 digit: digit.to_string(),
279 })?;
280 }
281 }
282 tx_handle.reply(rsip::StatusCode::OK).await.ok();
283 }
284 DialogState::Updated(dialog_id, _req, tx_handle) => {
285 info!(session_id = states.session_id, %dialog_id, "dialog update received");
286 let mut answer_sdp = None;
287 if let Some(sdp_body) = _req.body().get(..) {
288 let sdp_str = String::from_utf8_lossy(sdp_body);
289 if !sdp_str.is_empty()
290 && (_req.method == rsip::Method::Invite
291 || _req.method == rsip::Method::Update)
292 {
293 info!(session_id=states.session_id, %dialog_id, method=%_req.method, "handling re-invite/update offer");
294 match states
295 .media_stream
296 .handshake(&states.track_id, sdp_str.to_string(), None)
297 .await
298 {
299 Ok(sdp) => answer_sdp = Some(sdp),
300 Err(e) => {
301 warn!(
302 session_id = states.session_id,
303 "failed to handle re-invite: {}", e
304 );
305 }
306 }
307 } else {
308 info!(session_id=states.session_id, %dialog_id, "updating remote description:\n{}", sdp_str);
309 states
310 .media_stream
311 .update_remote_description(&states.track_id, &sdp_str.to_string())
312 .await?;
313 }
314 }
315
316 if let Some(sdp) = answer_sdp {
317 tx_handle
318 .respond(
319 rsip::StatusCode::OK,
320 Some(vec![rsip::Header::ContentType(
321 "application/sdp".to_string().into(),
322 )]),
323 Some(sdp.into()),
324 )
325 .await
326 .ok();
327 } else {
328 tx_handle.reply(rsip::StatusCode::OK).await.ok();
329 }
330 }
331 DialogState::Options(dialog_id, _req, tx_handle) => {
332 info!(session_id = states.session_id, %dialog_id, "dialog options received");
333 tx_handle.reply(rsip::StatusCode::OK).await.ok();
334 }
335 DialogState::Terminated(dialog_id, reason) => {
336 info!(
337 session_id = states.session_id,
338 ?dialog_id,
339 ?reason,
340 "dialog terminated"
341 );
342 states.terminated_reason = Some(reason.clone());
343 return Ok(());
344 }
345 other_state => {
346 info!(
347 session_id = states.session_id,
348 %other_state,
349 "dialog received other state"
350 );
351 }
352 }
353 }
354 Ok(())
355 }
356
357 pub(super) async fn process_dialog(&mut self, mut states: InviteDialogStates) {
358 let token = states.cancel_token.clone();
359 tokio::select! {
360 _ = token.cancelled() => {
361 states.terminated_reason = Some(TerminatedReason::UacCancel);
362 }
363 _ = self.dialog_event_loop(&mut states) => {}
364 };
365 self.drop_async().await;
366 }
367}
368
369#[derive(Clone)]
370pub struct Invitation {
371 pub dialog_layer: Arc<DialogLayer>,
372 pub pending_dialogs: Arc<std::sync::Mutex<HashMap<DialogId, PendingDialog>>>,
373}
374
375impl Invitation {
376 pub fn new(dialog_layer: Arc<DialogLayer>) -> Self {
377 Self {
378 dialog_layer,
379 pending_dialogs: Arc::new(std::sync::Mutex::new(HashMap::new())),
380 }
381 }
382
383 pub fn add_pending(&self, dialog_id: DialogId, pending: PendingDialog) {
384 self.pending_dialogs
385 .lock()
386 .map(|mut ps| ps.insert(dialog_id, pending))
387 .ok();
388 }
389
390 pub fn get_pending_call(&self, dialog_id: &DialogId) -> Option<PendingDialog> {
391 self.pending_dialogs
392 .lock()
393 .ok()
394 .and_then(|mut ps| ps.remove(dialog_id))
395 }
396
397 pub fn has_pending_call(&self, dialog_id: &DialogId) -> bool {
398 self.pending_dialogs
399 .lock()
400 .ok()
401 .map(|ps| ps.contains_key(dialog_id))
402 .unwrap_or(false)
403 }
404
405 pub fn find_dialog_id_by_session_id(&self, session_id: &str) -> Option<DialogId> {
406 self.pending_dialogs.lock().ok().and_then(|ps| {
407 ps.iter()
408 .find(|(id, _)| id.to_string() == session_id)
409 .map(|(id, _)| id.clone())
410 })
411 }
412
413 pub async fn hangup(
414 &self,
415 dialog_id: DialogId,
416 code: Option<rsip::StatusCode>,
417 reason: Option<String>,
418 ) -> Result<()> {
419 if let Some(call) = self.get_pending_call(&dialog_id) {
420 call.dialog.reject(code, reason).ok();
421 call.token.cancel();
422 }
423 match self.dialog_layer.get_dialog(&dialog_id) {
424 Some(dialog) => {
425 self.dialog_layer.remove_dialog(&dialog_id);
426 dialog.hangup().await.ok();
427 }
428 None => {}
429 }
430 Ok(())
431 }
432
433 pub async fn reject(&self, dialog_id: DialogId) -> Result<()> {
434 if let Some(call) = self.get_pending_call(&dialog_id) {
435 call.dialog.reject(None, None).ok();
436 call.token.cancel();
437 }
438 match self.dialog_layer.get_dialog(&dialog_id) {
439 Some(dialog) => {
440 self.dialog_layer.remove_dialog(&dialog_id);
441 dialog.hangup().await.ok();
442 }
443 None => {}
444 }
445 Ok(())
446 }
447
448 pub async fn invite(
449 &self,
450 invite_option: InviteOption,
451 state_sender: DialogStateSender,
452 ) -> Result<(DialogId, Option<Vec<u8>>), rsipstack::Error> {
453 let (dialog, resp) = self
454 .dialog_layer
455 .do_invite(invite_option, state_sender)
456 .await?;
457
458 let offer = match resp {
459 Some(resp) => match resp.status_code.kind() {
460 rsip::StatusCodeKind::Successful => {
461 let offer = resp.body.clone();
462 Some(offer)
463 }
464 _ => {
465 let reason = resp
466 .reason_phrase()
467 .unwrap_or(&resp.status_code.to_string())
468 .to_string();
469 return Err(rsipstack::Error::DialogError(
470 reason,
471 dialog.id(),
472 resp.status_code,
473 ));
474 }
475 },
476 None => {
477 return Err(rsipstack::Error::DialogError(
478 "no response received".to_string(),
479 dialog.id(),
480 rsip::StatusCode::NotAcceptableHere,
481 ));
482 }
483 };
484 Ok((dialog.id(), offer))
485 }
486}