1use crate::call::active_call::ActiveCallStateRef;
2use crate::callrecord::CallRecordHangupReason;
3use crate::event::EventSender;
4use crate::media::TrackId;
5use crate::media::stream::MediaStream;
6use crate::useragent::invitation::PendingDialog;
7use anyhow::Result;
8use chrono::Utc;
9use rsipstack::dialog::DialogId;
10use rsipstack::dialog::dialog::{
11 Dialog, DialogState, DialogStateReceiver, DialogStateSender, TerminatedReason,
12};
13use rsipstack::dialog::dialog_layer::DialogLayer;
14use rsipstack::dialog::invitation::InviteOption;
15use rsipstack::rsip_ext::RsipResponseExt;
16use std::collections::HashMap;
17use std::sync::Arc;
18use tokio_util::sync::CancellationToken;
19use tracing::{info, warn};
20
21pub struct DialogStateReceiverGuard {
22 pub(super) dialog_layer: Arc<DialogLayer>,
23 pub(super) receiver: DialogStateReceiver,
24 pub(super) dialog_id: Option<DialogId>,
25}
26
27impl DialogStateReceiverGuard {
28 pub fn new(dialog_layer: Arc<DialogLayer>, receiver: DialogStateReceiver) -> Self {
29 Self {
30 dialog_layer,
31 receiver,
32 dialog_id: None,
33 }
34 }
35 pub async fn recv(&mut self) -> Option<DialogState> {
36 let state = self.receiver.recv().await;
37 if let Some(ref s) = state {
38 self.dialog_id = Some(s.id().clone());
39 }
40 state
41 }
42
43 fn take_dialog(&mut self) -> Option<Dialog> {
44 let id = match self.dialog_id.take() {
45 Some(id) => id,
46 None => return None,
47 };
48
49 match self.dialog_layer.get_dialog(&id) {
50 Some(dialog) => {
51 info!(%id, "dialog removed on drop");
52 self.dialog_layer.remove_dialog(&id);
53 return Some(dialog);
54 }
55 _ => {}
56 }
57 None
58 }
59
60 pub async fn drop_async(&mut self) {
61 if let Some(dialog) = self.take_dialog() {
62 if let Err(e) = dialog.hangup().await {
63 warn!(id=%dialog.id(), "error hanging up dialog on drop: {}", e);
64 }
65 }
66 }
67}
68
69impl Drop for DialogStateReceiverGuard {
70 fn drop(&mut self) {
71 if let Some(dialog) = self.take_dialog() {
72 crate::spawn(async move {
73 if let Err(e) = dialog.hangup().await {
74 warn!(id=%dialog.id(), "error hanging up dialog on drop: {}", e);
75 }
76 });
77 }
78 }
79}
80
81pub(super) struct InviteDialogStates {
82 pub is_client: bool,
83 pub session_id: String,
84 pub track_id: TrackId,
85 pub cancel_token: CancellationToken,
86 pub event_sender: EventSender,
87 pub call_state: ActiveCallStateRef,
88 pub media_stream: Arc<MediaStream>,
89 pub terminated_reason: Option<TerminatedReason>,
90 pub has_early_media: bool,
91}
92
93impl InviteDialogStates {
94 pub(super) fn on_terminated(&mut self) {
95 let mut call_state_ref = match self.call_state.try_write() {
96 Ok(cs) => cs,
97 Err(_) => {
98 return;
99 }
100 };
101 let reason = &self.terminated_reason;
102 call_state_ref.last_status_code = match reason {
103 Some(TerminatedReason::UacCancel) => 487,
104 Some(TerminatedReason::UacBye) => 200,
105 Some(TerminatedReason::UacBusy) => 486,
106 Some(TerminatedReason::UasBye) => 200,
107 Some(TerminatedReason::UasBusy) => 486,
108 Some(TerminatedReason::UasDecline) => 603,
109 Some(TerminatedReason::UacOther(code)) => code.code(),
110 Some(TerminatedReason::UasOther(code)) => code.code(),
111 _ => 500, };
113
114 if call_state_ref.hangup_reason.is_none() {
115 call_state_ref.hangup_reason.replace(match reason {
116 Some(TerminatedReason::UacCancel) => CallRecordHangupReason::Canceled,
117 Some(TerminatedReason::UacBye) | Some(TerminatedReason::UacBusy) => {
118 CallRecordHangupReason::ByCaller
119 }
120 Some(TerminatedReason::UasBye) | Some(TerminatedReason::UasBusy) => {
121 CallRecordHangupReason::ByCallee
122 }
123 Some(TerminatedReason::UasDecline) => CallRecordHangupReason::ByCallee,
124 Some(TerminatedReason::UacOther(_)) => CallRecordHangupReason::ByCaller,
125 Some(TerminatedReason::UasOther(_)) => CallRecordHangupReason::ByCallee,
126 _ => CallRecordHangupReason::BySystem,
127 });
128 };
129 let initiator = match reason {
130 Some(TerminatedReason::UacCancel) => "caller".to_string(),
131 Some(TerminatedReason::UacBye) | Some(TerminatedReason::UacBusy) => {
132 "caller".to_string()
133 }
134 Some(TerminatedReason::UasBye)
135 | Some(TerminatedReason::UasBusy)
136 | Some(TerminatedReason::UasDecline) => "callee".to_string(),
137 _ => "system".to_string(),
138 };
139 self.event_sender
140 .send(crate::event::SessionEvent::TrackEnd {
141 track_id: self.track_id.clone(),
142 timestamp: crate::media::get_timestamp(),
143 duration: call_state_ref
144 .answer_time
145 .map(|t| (Utc::now() - t).num_milliseconds())
146 .unwrap_or_default() as u64,
147 ssrc: call_state_ref.ssrc,
148 play_id: None,
149 })
150 .ok();
151 let hangup_event =
152 call_state_ref.build_hangup_event(self.track_id.clone(), Some(initiator));
153 self.event_sender.send(hangup_event).ok();
154 }
155}
156
157impl Drop for InviteDialogStates {
158 fn drop(&mut self) {
159 self.on_terminated();
160 self.cancel_token.cancel();
161 }
162}
163
164impl DialogStateReceiverGuard {
165 pub(self) async fn dialog_event_loop(&mut self, states: &mut InviteDialogStates) -> Result<()> {
166 while let Some(event) = self.recv().await {
167 match event {
168 DialogState::Calling(dialog_id) => {
169 info!(session_id=states.session_id, %dialog_id, "dialog calling");
170 states.call_state.write().await.session_id = dialog_id.to_string();
171 }
172 DialogState::Trying(_) => {}
173 DialogState::Early(dialog_id, resp) => {
174 let code = resp.status_code.code();
175 let body = resp.body();
176 let answer = String::from_utf8_lossy(body);
177 let has_sdp = !answer.is_empty();
178 info!(session_id=states.session_id, %dialog_id, has_sdp=%has_sdp, "dialog early ({}): \n{}", code, answer);
179
180 {
181 let mut cs = states.call_state.write().await;
182 if cs.ring_time.is_none() {
183 cs.ring_time.replace(Utc::now());
184 }
185 cs.last_status_code = code;
186 }
187
188 if !states.is_client {
189 continue;
190 }
191
192 let refer = states.call_state.read().await.is_refer;
193
194 states
195 .event_sender
196 .send(crate::event::SessionEvent::Ringing {
197 track_id: states.track_id.clone(),
198 timestamp: crate::media::get_timestamp(),
199 early_media: has_sdp,
200 refer: Some(refer),
201 })?;
202
203 if has_sdp {
204 states.has_early_media = true;
205 states
206 .media_stream
207 .update_remote_description(&states.track_id, &answer.to_string())
208 .await?;
209 }
210 }
211 DialogState::Confirmed(dialog_id, msg) => {
212 info!(session_id=states.session_id, %dialog_id, has_early_media=%states.has_early_media, "dialog confirmed");
213 {
214 let mut cs = states.call_state.write().await;
215 cs.session_id = dialog_id.to_string();
216 cs.answer_time.replace(Utc::now());
217 cs.last_status_code = 200;
218 }
219 if states.is_client {
220 let answer = String::from_utf8_lossy(msg.body());
221 let answer = answer.trim();
222 if !answer.is_empty() {
223 if states.has_early_media {
224 info!(
225 session_id = states.session_id,
226 "updating remote description with final answer after early media"
227 );
228 }
229 if let Err(e) = states
230 .media_stream
231 .update_remote_description(&states.track_id, &answer.to_string())
232 .await
233 {
234 tracing::warn!(
235 session_id = states.session_id,
236 "failed to update remote description on confirmed: {}",
237 e
238 );
239 }
240 }
241 }
242 }
243 DialogState::Info(dialog_id, req, tx_handle) => {
244 let body_str = String::from_utf8_lossy(req.body());
245 info!(session_id=states.session_id, %dialog_id, body=%body_str, "dialog info received");
246 if body_str.starts_with("Signal=") {
247 let digit = body_str.trim_start_matches("Signal=").chars().next();
248 if let Some(digit) = digit {
249 states.event_sender.send(crate::event::SessionEvent::Dtmf {
250 track_id: states.track_id.clone(),
251 timestamp: crate::media::get_timestamp(),
252 digit: digit.to_string(),
253 })?;
254 }
255 }
256 tx_handle.reply(rsip::StatusCode::OK).await.ok();
257 }
258 DialogState::Updated(dialog_id, _req, tx_handle) => {
259 info!(session_id = states.session_id, %dialog_id, "dialog update received");
260 if let Some(sdp_body) = _req.body().get(..) {
261 let sdp_str = String::from_utf8_lossy(sdp_body);
262 info!(session_id=states.session_id, %dialog_id, "updating remote description:\n{}", sdp_str);
263 states
264 .media_stream
265 .update_remote_description(&states.track_id, &sdp_str.to_string())
266 .await?;
267 }
268 tx_handle.reply(rsip::StatusCode::OK).await.ok();
269 }
270 DialogState::Options(dialog_id, _req, tx_handle) => {
271 info!(session_id = states.session_id, %dialog_id, "dialog options received");
272 tx_handle.reply(rsip::StatusCode::OK).await.ok();
273 }
274 DialogState::Terminated(dialog_id, reason) => {
275 info!(
276 session_id = states.session_id,
277 ?dialog_id,
278 ?reason,
279 "dialog terminated"
280 );
281 states.terminated_reason = Some(reason.clone());
282 return Ok(());
283 }
284 other_state => {
285 info!(
286 session_id = states.session_id,
287 %other_state,
288 "dialog received other state"
289 );
290 }
291 }
292 }
293 Ok(())
294 }
295
296 pub(super) async fn process_dialog(&mut self, mut states: InviteDialogStates) {
297 let token = states.cancel_token.clone();
298 tokio::select! {
299 _ = token.cancelled() => {
300 states.terminated_reason = Some(TerminatedReason::UacCancel);
301 }
302 _ = self.dialog_event_loop(&mut states) => {}
303 };
304 self.drop_async().await;
305 }
306}
307
308#[derive(Clone)]
309pub struct Invitation {
310 pub dialog_layer: Arc<DialogLayer>,
311 pub pending_dialogs: Arc<std::sync::Mutex<HashMap<String, PendingDialog>>>,
312}
313
314impl Invitation {
315 pub fn new(dialog_layer: Arc<DialogLayer>) -> Self {
316 Self {
317 dialog_layer,
318 pending_dialogs: Arc::new(std::sync::Mutex::new(HashMap::new())),
319 }
320 }
321
322 pub fn add_pending(&self, session_id: String, pending: PendingDialog) {
323 self.pending_dialogs
324 .lock()
325 .map(|mut ps| ps.insert(session_id, pending))
326 .ok();
327 }
328
329 pub fn get_pending_call(&self, session_id: &String) -> Option<PendingDialog> {
330 self.pending_dialogs
331 .lock()
332 .ok()
333 .map(|mut ps| ps.remove(session_id))
334 .flatten()
335 }
336
337 pub fn has_pending_call(&self, session_id: &str) -> Option<DialogId> {
338 self.pending_dialogs
339 .lock()
340 .ok()
341 .map(|ps| ps.get(session_id).map(|d| d.dialog.id()))
342 .flatten()
343 }
344
345 pub async fn hangup(
346 &self,
347 dialog_id: DialogId,
348 code: Option<rsip::StatusCode>,
349 reason: Option<String>,
350 ) -> Result<()> {
351 if let Some(call) = self.get_pending_call(&dialog_id.to_string()) {
352 call.dialog.reject(code, reason).ok();
353 call.token.cancel();
354 }
355 match self.dialog_layer.get_dialog(&dialog_id) {
356 Some(dialog) => {
357 self.dialog_layer.remove_dialog(&dialog_id);
358 dialog.hangup().await.ok();
359 }
360 None => {}
361 }
362 Ok(())
363 }
364
365 pub async fn reject(&self, dialog_id: DialogId) -> Result<()> {
366 if let Some(call) = self.get_pending_call(&dialog_id.to_string()) {
367 call.dialog.reject(None, None).ok();
368 call.token.cancel();
369 }
370 match self.dialog_layer.get_dialog(&dialog_id) {
371 Some(dialog) => {
372 self.dialog_layer.remove_dialog(&dialog_id);
373 dialog.hangup().await.ok();
374 }
375 None => {}
376 }
377 Ok(())
378 }
379
380 pub async fn invite(
381 &self,
382 invite_option: InviteOption,
383 state_sender: DialogStateSender,
384 ) -> Result<(DialogId, Option<Vec<u8>>), rsipstack::Error> {
385 let (dialog, resp) = self
386 .dialog_layer
387 .do_invite(invite_option, state_sender)
388 .await?;
389
390 let offer = match resp {
391 Some(resp) => match resp.status_code.kind() {
392 rsip::StatusCodeKind::Successful => {
393 let offer = resp.body.clone();
394 Some(offer)
395 }
396 _ => {
397 let reason = resp
398 .reason_phrase()
399 .unwrap_or(&resp.status_code.to_string())
400 .to_string();
401 return Err(rsipstack::Error::DialogError(
402 reason,
403 dialog.id(),
404 resp.status_code,
405 ));
406 }
407 },
408 None => {
409 return Err(rsipstack::Error::DialogError(
410 "no response received".to_string(),
411 dialog.id(),
412 rsip::StatusCode::NotAcceptableHere,
413 ));
414 }
415 };
416 Ok((dialog.id(), offer))
417 }
418}