1use super::{
2 authenticate::{handle_client_authenticate, Credential},
3 client_dialog::ClientInviteDialog,
4 server_dialog::ServerInviteDialog,
5 DialogId,
6};
7use crate::{
8 header_pop,
9 rsip_ext::extract_uri_from_contact,
10 transaction::{
11 endpoint::EndpointInnerRef,
12 key::{TransactionKey, TransactionRole},
13 make_via_branch,
14 transaction::{Transaction, TransactionEventSender},
15 },
16 Result,
17};
18use rsip::{
19 headers::Route,
20 prelude::{HeadersExt, ToTypedHeader, UntypedHeader},
21 typed::{CSeq, Contact, Via},
22 Header, Param, Request, Response, SipMessage, StatusCode,
23};
24use std::sync::{
25 atomic::{AtomicU32, Ordering},
26 Arc, Mutex,
27};
28use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, info, warn};
31
32#[derive(Clone)]
69pub enum DialogState {
70 Calling(DialogId),
71 Trying(DialogId),
72 Early(DialogId, rsip::Response),
73 WaitAck(DialogId, rsip::Response),
74 Confirmed(DialogId),
75 Updated(DialogId, rsip::Request),
76 Notify(DialogId, rsip::Request),
77 Info(DialogId, rsip::Request),
78 Options(DialogId, rsip::Request),
79 Terminated(DialogId, TerminatedReason),
80}
81
82#[derive(Debug, Clone)]
83pub enum TerminatedReason {
84 Timeout,
85 UacCancel,
86 UacBye,
87 UasBye,
88 UacBusy,
89 UasBusy,
90 UasDecline,
91 ProxyError(rsip::StatusCode),
92 ProxyAuthRequired,
93 UacOther(Option<rsip::StatusCode>),
94 UasOther(Option<rsip::StatusCode>),
95}
96
97#[derive(Clone)]
125pub enum Dialog {
126 ServerInvite(ServerInviteDialog),
127 ClientInvite(ClientInviteDialog),
128}
129
130pub struct DialogInner {
163 pub role: TransactionRole,
164 pub cancel_token: CancellationToken,
165 pub id: Mutex<DialogId>,
166 pub state: Mutex<DialogState>,
167
168 pub local_seq: AtomicU32,
169 pub local_contact: Option<rsip::Uri>,
170 pub remote_contact: Mutex<Option<rsip::headers::untyped::Contact>>,
171
172 pub remote_seq: AtomicU32,
173 pub remote_uri: rsip::Uri,
174
175 pub from: String,
176 pub to: Mutex<String>,
177
178 pub credential: Option<Credential>,
179 pub route_set: Vec<Route>,
180 pub(super) endpoint_inner: EndpointInnerRef,
181 pub(super) state_sender: DialogStateSender,
182 pub(super) tu_sender: TransactionEventSender,
183 pub(super) initial_request: Request,
184}
185
186pub type DialogStateReceiver = UnboundedReceiver<DialogState>;
187pub type DialogStateSender = UnboundedSender<DialogState>;
188
189pub(super) type DialogInnerRef = Arc<DialogInner>;
190
191impl DialogState {
192 pub fn can_cancel(&self) -> bool {
193 matches!(
194 self,
195 DialogState::Calling(_) | DialogState::Trying(_) | DialogState::Early(_, _)
196 )
197 }
198 pub fn is_confirmed(&self) -> bool {
199 matches!(self, DialogState::Confirmed(_))
200 }
201 pub fn is_terminated(&self) -> bool {
202 matches!(self, DialogState::Terminated(_, _))
203 }
204}
205
206impl DialogInner {
207 pub fn new(
208 role: TransactionRole,
209 id: DialogId,
210 initial_request: Request,
211 endpoint_inner: EndpointInnerRef,
212 state_sender: DialogStateSender,
213 credential: Option<Credential>,
214 local_contact: Option<rsip::Uri>,
215 tu_sender: TransactionEventSender,
216 ) -> Result<Self> {
217 let cseq = initial_request.cseq_header()?.seq()?;
218
219 let remote_uri = match role {
220 TransactionRole::Client => initial_request.uri.clone(),
221 TransactionRole::Server => {
222 extract_uri_from_contact(initial_request.contact_header()?.value())?
223 }
224 };
225
226 let from = initial_request.from_header()?.typed()?;
227 let mut to = initial_request.to_header()?.typed()?;
228 if !to.params.iter().any(|p| matches!(p, Param::Tag(_))) {
229 to.params.push(rsip::Param::Tag(id.to_tag.clone().into()));
230 }
231
232 let (from, to) = match role {
233 TransactionRole::Client => (from.to_string(), to.to_string()),
234 TransactionRole::Server => (to.to_string(), from.to_string()),
235 };
236
237 let mut route_set = vec![];
238 for h in initial_request.headers.iter() {
239 if let Header::RecordRoute(rr) = h {
240 route_set.push(Route::from(rr.value()));
241 }
242 }
243
244 if role == TransactionRole::Server {
246 route_set.reverse();
247 }
248 Ok(Self {
249 role,
250 cancel_token: CancellationToken::new(),
251 id: Mutex::new(id.clone()),
252 from,
253 to: Mutex::new(to),
254 local_seq: AtomicU32::new(cseq),
255 remote_uri,
256 remote_seq: AtomicU32::new(0),
257 credential,
258 route_set,
259 endpoint_inner,
260 state_sender,
261 tu_sender,
262 state: Mutex::new(DialogState::Calling(id)),
263 initial_request,
264 local_contact,
265 remote_contact: Mutex::new(None),
266 })
267 }
268 pub fn can_cancel(&self) -> bool {
269 self.state.lock().unwrap().can_cancel()
270 }
271 pub fn is_confirmed(&self) -> bool {
272 self.state.lock().unwrap().is_confirmed()
273 }
274 pub fn is_terminated(&self) -> bool {
275 self.state.lock().unwrap().is_terminated()
276 }
277 pub fn get_local_seq(&self) -> u32 {
278 self.local_seq.load(Ordering::Relaxed)
279 }
280 pub fn increment_local_seq(&self) -> u32 {
281 self.local_seq.fetch_add(1, Ordering::Relaxed);
282 self.local_seq.load(Ordering::Relaxed)
283 }
284
285 pub fn update_remote_tag(&self, tag: &str) -> Result<()> {
286 self.id.lock().unwrap().to_tag = tag.to_string();
287 let to: rsip::headers::untyped::To = self.to.lock().unwrap().clone().into();
288 *self.to.lock().unwrap() = to.typed()?.with_tag(tag.to_string().into()).to_string();
289 info!("updating remote tag to: {}", self.to.lock().unwrap());
290 Ok(())
291 }
292
293 pub(super) fn build_vias_from_request(&self) -> Result<Vec<Via>> {
294 let mut vias = vec![];
295 for header in self.initial_request.headers.iter() {
296 if let Header::Via(via) = header {
297 if let Ok(mut typed_via) = via.typed() {
298 for param in typed_via.params.iter_mut() {
299 if let Param::Branch(_) = param {
300 *param = make_via_branch();
301 }
302 }
303 vias.push(typed_via);
304 return Ok(vias);
305 }
306 }
307 }
308 let via = self.endpoint_inner.get_via(None, None)?;
309 vias.push(via);
310 Ok(vias)
311 }
312
313 pub(super) fn make_request_with_vias(
314 &self,
315 method: rsip::Method,
316 cseq: Option<u32>,
317 vias: Vec<rsip::headers::typed::Via>,
318 headers: Option<Vec<rsip::Header>>,
319 body: Option<Vec<u8>>,
320 ) -> Result<rsip::Request> {
321 let mut headers = headers.unwrap_or_default();
322 let cseq_header = CSeq {
323 seq: cseq.unwrap_or_else(|| self.increment_local_seq()),
324 method,
325 };
326
327 for via in vias {
328 headers.push(Header::Via(via.into()));
329 }
330 headers.push(Header::CallId(
331 self.id.lock().unwrap().call_id.clone().into(),
332 ));
333 headers.push(Header::From(self.from.clone().into()));
334 headers.push(Header::To(self.to.lock().unwrap().clone().into()));
335 headers.push(Header::CSeq(cseq_header.into()));
336 headers.push(Header::UserAgent(
337 self.endpoint_inner.user_agent.clone().into(),
338 ));
339
340 self.local_contact
341 .as_ref()
342 .map(|c| headers.push(Contact::from(c.clone()).into()));
343
344 for route in &self.route_set {
345 headers.push(Header::Route(route.clone()));
346 }
347 headers.push(Header::MaxForwards(70.into()));
348
349 body.as_ref().map(|b| {
350 headers.push(Header::ContentLength((b.len() as u32).into()));
351 });
352
353 let req = rsip::Request {
354 method,
355 uri: self.remote_uri.clone(),
356 headers: headers.into(),
357 body: body.unwrap_or_default(),
358 version: rsip::Version::V2,
359 };
360 Ok(req)
361 }
362
363 pub(super) fn make_request(
364 &self,
365 method: rsip::Method,
366 cseq: Option<u32>,
367 addr: Option<crate::transport::SipAddr>,
368 branch: Option<Param>,
369 headers: Option<Vec<rsip::Header>>,
370 body: Option<Vec<u8>>,
371 ) -> Result<rsip::Request> {
372 let via = self.endpoint_inner.get_via(addr, branch)?;
373 self.make_request_with_vias(method, cseq, vec![via], headers, body)
374 }
375
376 pub(super) fn make_response(
377 &self,
378 request: &Request,
379 status: StatusCode,
380 headers: Option<Vec<rsip::Header>>,
381 body: Option<Vec<u8>>,
382 ) -> rsip::Response {
383 let mut resp_headers = rsip::Headers::default();
384 self.local_contact
385 .as_ref()
386 .map(|c| resp_headers.push(Contact::from(c.clone()).into()));
387
388 for header in request.headers.iter() {
389 match header {
390 Header::Via(via) => {
391 resp_headers.push(Header::Via(via.clone()));
392 }
393 Header::From(from) => {
394 resp_headers.push(Header::From(from.clone()));
395 }
396 Header::To(to) => {
397 let mut to = match to.clone().typed() {
398 Ok(to) => to,
399 Err(e) => {
400 info!("error parsing to header {}", e);
401 continue;
402 }
403 };
404
405 if status != StatusCode::Trying {
406 if !to.params.iter().any(|p| matches!(p, Param::Tag(_))) {
407 to.params.push(rsip::Param::Tag(
408 self.id.lock().unwrap().to_tag.clone().into(),
409 ));
410 }
411 }
412 resp_headers.push(Header::To(to.into()));
413 }
414 Header::CSeq(cseq) => {
415 resp_headers.push(Header::CSeq(cseq.clone()));
416 }
417 Header::CallId(call_id) => {
418 resp_headers.push(Header::CallId(call_id.clone()));
419 }
420 Header::RecordRoute(rr) => {
421 resp_headers.push(Header::RecordRoute(rr.clone()));
423 }
424 _ => {}
425 }
426 }
427
428 if let Some(headers) = headers {
429 for header in headers {
430 resp_headers.unique_push(header);
431 }
432 }
433
434 body.as_ref().map(|b| {
435 resp_headers.push(Header::ContentLength((b.len() as u32).into()));
436 });
437
438 resp_headers.unique_push(Header::UserAgent(
439 self.endpoint_inner.user_agent.clone().into(),
440 ));
441
442 Response {
443 status_code: status,
444 headers: resp_headers,
445 body: body.unwrap_or_default(),
446 version: request.version().clone(),
447 }
448 }
449
450 pub(super) async fn do_request(&self, mut request: Request) -> Result<Option<rsip::Response>> {
451 let method = request.method().to_owned();
452 let mut destination = request
453 .route_header()
454 .map(|r| {
455 r.typed()
456 .ok()
457 .map(|r| r.uris().first().map(|u| u.uri.clone()))
458 .flatten()
459 })
460 .flatten();
461
462 if destination.is_none() {
463 if let Some(contact) = self.remote_contact.lock().unwrap().as_ref() {
464 destination = contact.uri().ok();
465 }
466 }
467
468 header_pop!(request.headers, Header::Route);
469
470 let key = TransactionKey::from_request(&request, TransactionRole::Client)?;
471 let mut tx = Transaction::new_client(key, request, self.endpoint_inner.clone(), None);
472 tx.destination = destination.as_ref().map(|d| d.try_into().ok()).flatten();
473
474 match tx.send().await {
475 Ok(_) => {
476 info!(
477 id = self.id.lock().unwrap().to_string(),
478 method = %method,
479 destination=tx.destination.as_ref().map(|d| d.to_string()).as_deref(),
480 key=%tx.key,
481 "request sent done",
482 );
483 }
484 Err(e) => {
485 warn!(
486 id = self.id.lock().unwrap().to_string(),
487 destination = tx.destination.as_ref().map(|d| d.to_string()).as_deref(),
488 "failed to send request error: {}\n{}",
489 e,
490 tx.original
491 );
492 return Err(e);
493 }
494 }
495 let mut auth_sent = false;
496 while let Some(msg) = tx.receive().await {
497 match msg {
498 SipMessage::Response(resp) => match resp.status_code {
499 StatusCode::Trying => {
500 continue;
501 }
502 StatusCode::Ringing | StatusCode::SessionProgress => {
503 self.transition(DialogState::Early(self.id.lock().unwrap().clone(), resp))?;
504 continue;
505 }
506 StatusCode::ProxyAuthenticationRequired | StatusCode::Unauthorized => {
507 let id = self.id.lock().unwrap().clone();
508 if auth_sent {
509 info!(
510 id = self.id.lock().unwrap().to_string(),
511 "received {} response after auth sent", resp.status_code
512 );
513 self.transition(DialogState::Terminated(
514 id,
515 TerminatedReason::ProxyAuthRequired,
516 ))?;
517 break;
518 }
519 auth_sent = true;
520 if let Some(cred) = &self.credential {
521 let new_seq = match method {
522 rsip::Method::Cancel => self.get_local_seq(),
523 _ => self.increment_local_seq(),
524 };
525 tx = handle_client_authenticate(new_seq, tx, resp, cred).await?;
526 tx.send().await?;
527 continue;
528 } else {
529 info!(
530 id = self.id.lock().unwrap().to_string(),
531 "received 407 response without auth option"
532 );
533 self.transition(DialogState::Terminated(
534 id,
535 TerminatedReason::ProxyAuthRequired,
536 ))?;
537 }
538 }
539 _ => {
540 debug!(
541 id = self.id.lock().unwrap().to_string(),
542 method = %method,
543 "dialog do_request done: {:?}", resp.status_code
544 );
545 return Ok(Some(resp));
546 }
547 },
548 _ => break,
549 }
550 }
551 Ok(None)
552 }
553
554 pub(super) fn transition(&self, state: DialogState) -> Result<()> {
555 self.state_sender.send(state.clone()).ok();
557
558 match state {
559 DialogState::Updated(_, _)
560 | DialogState::Notify(_, _)
561 | DialogState::Info(_, _)
562 | DialogState::Options(_, _) => {
563 return Ok(());
564 }
565 _ => {}
566 }
567 let mut old_state = self.state.lock().unwrap();
568 match (&*old_state, &state) {
569 (DialogState::Terminated(id, _), _) => {
570 warn!(
571 %id,
572 "dialog already terminated, ignoring transition to {}", state
573 );
574 return Ok(());
575 }
576 _ => {}
577 }
578 info!("transitioning state: {} -> {}", old_state, state);
579 *old_state = state;
580 Ok(())
581 }
582}
583
584impl std::fmt::Display for DialogState {
585 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
586 match self {
587 DialogState::Calling(id) => write!(f, "{}(Calling)", id),
588 DialogState::Trying(id) => write!(f, "{}(Trying)", id),
589 DialogState::Early(id, _) => write!(f, "{}(Early)", id),
590 DialogState::WaitAck(id, _) => write!(f, "{}(WaitAck)", id),
591 DialogState::Confirmed(id) => write!(f, "{}(Confirmed)", id),
592 DialogState::Updated(id, _) => write!(f, "{}(Updated)", id),
593 DialogState::Notify(id, _) => write!(f, "{}(Notify)", id),
594 DialogState::Info(id, _) => write!(f, "{}(Info)", id),
595 DialogState::Options(id, _) => write!(f, "{}(Options)", id),
596 DialogState::Terminated(id, reason) => write!(f, "{}(Terminated {:?})", id, reason),
597 }
598 }
599}
600
601impl Dialog {
602 pub fn id(&self) -> DialogId {
603 match self {
604 Dialog::ServerInvite(d) => d.inner.id.lock().unwrap().clone(),
605 Dialog::ClientInvite(d) => d.inner.id.lock().unwrap().clone(),
606 }
607 }
608
609 pub fn from(&self) -> &str {
610 match self {
611 Dialog::ServerInvite(d) => &d.inner.from,
612 Dialog::ClientInvite(d) => &d.inner.from,
613 }
614 }
615
616 pub fn to(&self) -> String {
617 match self {
618 Dialog::ServerInvite(d) => d.inner.to.lock().unwrap().clone(),
619 Dialog::ClientInvite(d) => d.inner.to.lock().unwrap().clone(),
620 }
621 }
622 pub fn remote_contact(&self) -> Option<rsip::Uri> {
623 match self {
624 Dialog::ServerInvite(d) => d
625 .inner
626 .remote_contact
627 .lock()
628 .unwrap()
629 .as_ref()
630 .map(|c| c.uri().ok())
631 .flatten(),
632 Dialog::ClientInvite(d) => d
633 .inner
634 .remote_contact
635 .lock()
636 .unwrap()
637 .as_ref()
638 .map(|c| c.uri().ok())
639 .flatten(),
640 }
641 }
642
643 pub async fn handle(&mut self, tx: &mut Transaction) -> Result<()> {
644 match self {
645 Dialog::ServerInvite(d) => d.handle(tx).await,
646 Dialog::ClientInvite(d) => d.handle(tx).await,
647 }
648 }
649 pub fn on_remove(&self) {
650 match self {
651 Dialog::ServerInvite(d) => {
652 d.inner.cancel_token.cancel();
653 }
654 Dialog::ClientInvite(d) => {
655 d.inner.cancel_token.cancel();
656 }
657 }
658 }
659
660 pub async fn hangup(&self) -> Result<()> {
661 match self {
662 Dialog::ServerInvite(d) => d.bye().await,
663 Dialog::ClientInvite(d) => d.hangup().await,
664 }
665 }
666}