1use crate::call::active_call::ActiveCallStateRef;
2use crate::callrecord::CallRecordHangupReason;
3use crate::useragent::invitation::PendingDialog;
4use anyhow::Result;
5use chrono::Utc;
6use rsipstack::dialog::DialogId;
7use rsipstack::dialog::dialog::{
8 Dialog, DialogState, DialogStateReceiver, DialogStateSender, TerminatedReason,
9};
10use rsipstack::dialog::dialog_layer::DialogLayer;
11use rsipstack::dialog::invitation::InviteOption;
12use rsipstack::rsip_ext::RsipResponseExt;
13use std::collections::HashMap;
14use std::sync::Arc;
15use tokio_util::sync::CancellationToken;
16use tracing::{info, warn};
17use voice_engine::event::EventSender;
18use voice_engine::media::TrackId;
19use voice_engine::media::stream::MediaStream;
20
21pub struct DialogStateReceiverGuard {
22 pub(super) dialog_layer: Arc<DialogLayer>,
23 pub(super) receiver: DialogStateReceiver,
24 pub(super) dialog_id: Option<DialogId>,
25}
26
27impl DialogStateReceiverGuard {
28 pub fn new(dialog_layer: Arc<DialogLayer>, receiver: DialogStateReceiver) -> Self {
29 Self {
30 dialog_layer,
31 receiver,
32 dialog_id: None,
33 }
34 }
35 pub async fn recv(&mut self) -> Option<DialogState> {
36 let state = self.receiver.recv().await;
37 if let Some(ref s) = state {
38 self.dialog_id = Some(s.id().clone());
39 }
40 state
41 }
42
43 fn take_dialog(&mut self) -> Option<Dialog> {
44 let id = match self.dialog_id.take() {
45 Some(id) => id,
46 None => return None,
47 };
48
49 match self.dialog_layer.get_dialog(&id) {
50 Some(dialog) => {
51 info!(%id, "dialog removed on drop");
52 self.dialog_layer.remove_dialog(&id);
53 return Some(dialog);
54 }
55 _ => {}
56 }
57 None
58 }
59
60 pub async fn drop_async(&mut self) {
61 if let Some(dialog) = self.take_dialog() {
62 if let Err(e) = dialog.hangup().await {
63 warn!(id=%dialog.id(), "error hanging up dialog on drop: {}", e);
64 }
65 }
66 }
67}
68
69impl Drop for DialogStateReceiverGuard {
70 fn drop(&mut self) {
71 if let Some(dialog) = self.take_dialog() {
72 tokio::spawn(async move {
73 if let Err(e) = dialog.hangup().await {
74 warn!(id=%dialog.id(), "error hanging up dialog on drop: {}", e);
75 }
76 });
77 }
78 }
79}
80
81pub(super) struct InviteDialogStates {
82 pub is_client: bool,
83 pub session_id: String,
84 pub track_id: TrackId,
85 pub cancel_token: CancellationToken,
86 pub event_sender: EventSender,
87 pub call_state: ActiveCallStateRef,
88 pub media_stream: Arc<MediaStream>,
89 pub terminated_reason: Option<TerminatedReason>,
90}
91
92impl InviteDialogStates {
93 pub(super) fn on_terminated(&mut self) {
94 let mut call_state_ref = match self.call_state.write() {
95 Ok(cs) => cs,
96 Err(_) => {
97 return;
98 }
99 };
100 let reason = &self.terminated_reason;
101 call_state_ref.last_status_code = match reason {
102 Some(TerminatedReason::UacCancel) => 487,
103 Some(TerminatedReason::UacBye) => 200,
104 Some(TerminatedReason::UacBusy) => 486,
105 Some(TerminatedReason::UasBye) => 200,
106 Some(TerminatedReason::UasBusy) => 486,
107 Some(TerminatedReason::UasDecline) => 603,
108 Some(TerminatedReason::UacOther(code)) => code.code(),
109 Some(TerminatedReason::UasOther(code)) => code.code(),
110 _ => 500, };
112
113 if call_state_ref.hangup_reason.is_none() {
114 call_state_ref.hangup_reason.replace(match reason {
115 Some(TerminatedReason::UacCancel) => CallRecordHangupReason::Canceled,
116 Some(TerminatedReason::UacBye) | Some(TerminatedReason::UacBusy) => {
117 CallRecordHangupReason::ByCaller
118 }
119 Some(TerminatedReason::UasBye) | Some(TerminatedReason::UasBusy) => {
120 CallRecordHangupReason::ByCallee
121 }
122 Some(TerminatedReason::UasDecline) => CallRecordHangupReason::ByCallee,
123 Some(TerminatedReason::UacOther(_)) => CallRecordHangupReason::ByCaller,
124 Some(TerminatedReason::UasOther(_)) => CallRecordHangupReason::ByCallee,
125 _ => CallRecordHangupReason::BySystem,
126 });
127 };
128 let initiator = match reason {
129 Some(TerminatedReason::UacCancel) => "caller".to_string(),
130 Some(TerminatedReason::UacBye) | Some(TerminatedReason::UacBusy) => {
131 "caller".to_string()
132 }
133 Some(TerminatedReason::UasBye)
134 | Some(TerminatedReason::UasBusy)
135 | Some(TerminatedReason::UasDecline) => "callee".to_string(),
136 _ => "system".to_string(),
137 };
138 self.event_sender
139 .send(voice_engine::event::SessionEvent::TrackEnd {
140 track_id: self.track_id.clone(),
141 timestamp: voice_engine::media::get_timestamp(),
142 duration: call_state_ref
143 .answer_time
144 .map(|t| (Utc::now() - t).num_milliseconds())
145 .unwrap_or_default() as u64,
146 ssrc: call_state_ref.ssrc,
147 play_id: None,
148 })
149 .ok();
150 let hangup_event =
151 call_state_ref.build_hangup_event(self.track_id.clone(), Some(initiator));
152 self.event_sender.send(hangup_event).ok();
153 }
154}
155
156impl Drop for InviteDialogStates {
157 fn drop(&mut self) {
158 self.on_terminated();
159 self.cancel_token.cancel();
160 }
161}
162
163impl DialogStateReceiverGuard {
164 pub(self) async fn dialog_event_loop(&mut self, states: &mut InviteDialogStates) -> Result<()> {
165 while let Some(event) = self.recv().await {
166 match event {
167 DialogState::Calling(dialog_id) => {
168 info!(session_id=states.session_id, %dialog_id, "dialog calling");
169 states
170 .call_state
171 .as_ref()
172 .write()
173 .map(|mut cs| cs.session_id = dialog_id.to_string())
174 .ok();
175 }
176 DialogState::Trying(_) => {}
177 DialogState::Early(dialog_id, resp) => {
178 let code = resp.status_code.code();
179 let body = resp.body();
180 let answer = String::from_utf8_lossy(body);
181 info!(session_id=states.session_id, %dialog_id, "dialog earlyanswer: \n{}", answer);
182
183 states
184 .call_state
185 .as_ref()
186 .write()
187 .map(|mut cs| {
188 if cs.ring_time.is_none() {
189 cs.ring_time.replace(Utc::now());
190 }
191 cs.last_status_code = code;
192 })
193 .ok();
194
195 if !states.is_client {
196 continue;
197 }
198
199 let refer = states
200 .call_state
201 .read()
202 .map(|cs| cs.is_refer)
203 .unwrap_or(false);
204
205 states
206 .event_sender
207 .send(voice_engine::event::SessionEvent::Ringing {
208 track_id: states.track_id.clone(),
209 timestamp: voice_engine::media::get_timestamp(),
210 early_media: !answer.is_empty(),
211 refer: Some(refer),
212 })?;
213
214 if answer.is_empty() {
215 continue;
216 }
217 states
218 .media_stream
219 .update_remote_description(&states.track_id, &answer.to_string())
220 .await?;
221 }
222 DialogState::Confirmed(dialog_id, _) => {
223 info!(session_id=states.session_id, %dialog_id, "dialog confirmed");
224 states
225 .call_state
226 .as_ref()
227 .write()
228 .map(|mut cs| {
229 cs.session_id = dialog_id.to_string();
230 cs.answer_time.replace(Utc::now());
231 cs.last_status_code = 200;
232 })
233 .ok();
234 }
235 DialogState::Info(dialog_id, req) => {
236 let body_str = String::from_utf8_lossy(req.body());
237 info!(session_id=states.session_id, %dialog_id, body=%body_str, "dialog info received");
238 if body_str.starts_with("Signal=") {
239 let digit = body_str.trim_start_matches("Signal=").chars().next();
240 if let Some(digit) = digit {
241 states
242 .event_sender
243 .send(voice_engine::event::SessionEvent::Dtmf {
244 track_id: states.track_id.clone(),
245 timestamp: voice_engine::media::get_timestamp(),
246 digit: digit.to_string(),
247 })?;
248 }
249 }
250 }
251 DialogState::Terminated(dialog_id, reason) => {
252 info!(
253 session_id = states.session_id,
254 ?dialog_id,
255 ?reason,
256 "dialog terminated"
257 );
258 states.terminated_reason = Some(reason.clone());
259 return Ok(());
260 }
261 other_state => {
262 info!(
263 session_id = states.session_id,
264 %other_state,
265 "dialog received other state"
266 );
267 }
268 }
269 }
270 Ok(())
271 }
272
273 pub(super) async fn process_dialog(&mut self, mut states: InviteDialogStates) {
274 let token = states.cancel_token.clone();
275 tokio::select! {
276 _ = token.cancelled() => {
277 states.terminated_reason = Some(TerminatedReason::UacCancel);
278 }
279 _ = self.dialog_event_loop(&mut states) => {}
280 };
281 self.drop_async().await;
282 }
283}
284
285#[derive(Clone)]
286pub struct Invitation {
287 pub dialog_layer: Arc<DialogLayer>,
288 pub pending_dialogs: Arc<std::sync::Mutex<HashMap<String, PendingDialog>>>,
289}
290
291impl Invitation {
292 pub fn new(dialog_layer: Arc<DialogLayer>) -> Self {
293 Self {
294 dialog_layer,
295 pending_dialogs: Arc::new(std::sync::Mutex::new(HashMap::new())),
296 }
297 }
298
299 pub fn add_pending(&self, session_id: String, pending: PendingDialog) {
300 self.pending_dialogs
301 .lock()
302 .map(|mut ps| ps.insert(session_id, pending))
303 .ok();
304 }
305
306 pub fn get_pending_call(&self, session_id: &String) -> Option<PendingDialog> {
307 self.pending_dialogs
308 .lock()
309 .ok()
310 .map(|mut ps| ps.remove(session_id))
311 .flatten()
312 }
313
314 pub fn has_pending_call(&self, session_id: &str) -> Option<DialogId> {
315 self.pending_dialogs
316 .lock()
317 .ok()
318 .map(|ps| ps.get(session_id).map(|d| d.dialog.id()))
319 .flatten()
320 }
321
322 pub async fn hangup(
323 &self,
324 dialog_id: DialogId,
325 code: Option<rsip::StatusCode>,
326 reason: Option<String>,
327 ) -> Result<()> {
328 if let Some(call) = self.get_pending_call(&dialog_id.to_string()) {
329 call.dialog.reject(code, reason).ok();
330 call.token.cancel();
331 }
332 match self.dialog_layer.get_dialog(&dialog_id) {
333 Some(dialog) => {
334 self.dialog_layer.remove_dialog(&dialog_id);
335 dialog.hangup().await.ok();
336 }
337 None => {}
338 }
339 Ok(())
340 }
341
342 pub async fn reject(&self, dialog_id: DialogId) -> Result<()> {
343 if let Some(call) = self.get_pending_call(&dialog_id.to_string()) {
344 call.dialog.reject(None, None).ok();
345 call.token.cancel();
346 }
347 match self.dialog_layer.get_dialog(&dialog_id) {
348 Some(dialog) => {
349 self.dialog_layer.remove_dialog(&dialog_id);
350 dialog.hangup().await.ok();
351 }
352 None => {}
353 }
354 Ok(())
355 }
356
357 pub async fn invite(
358 &self,
359 invite_option: InviteOption,
360 state_sender: DialogStateSender,
361 ) -> Result<(DialogId, Option<Vec<u8>>), rsipstack::Error> {
362 let (dialog, resp) = self
363 .dialog_layer
364 .do_invite(invite_option, state_sender)
365 .await?;
366
367 let offer = match resp {
368 Some(resp) => match resp.status_code.kind() {
369 rsip::StatusCodeKind::Successful => {
370 let offer = resp.body.clone();
371 Some(offer)
372 }
373 _ => {
374 let reason = resp
375 .reason_phrase()
376 .unwrap_or(&resp.status_code.to_string())
377 .to_string();
378 return Err(rsipstack::Error::DialogError(
379 reason,
380 dialog.id(),
381 resp.status_code,
382 ));
383 }
384 },
385 None => {
386 return Err(rsipstack::Error::DialogError(
387 "no response received".to_string(),
388 dialog.id(),
389 rsip::StatusCode::NotAcceptableHere,
390 ));
391 }
392 };
393 Ok((dialog.id(), offer))
394 }
395}