1use super::{
2 authenticate::{handle_client_authenticate, Credential},
3 client_dialog::ClientInviteDialog,
4 server_dialog::ServerInviteDialog,
5 DialogId,
6};
7use crate::{
8 rsip_ext::extract_uri_from_contact,
9 transaction::{
10 endpoint::EndpointInnerRef,
11 key::{TransactionKey, TransactionRole},
12 make_via_branch,
13 transaction::{Transaction, TransactionEventSender},
14 },
15 transport::SipAddr,
16 Result,
17};
18use rsip::{
19 headers::Route,
20 prelude::{HeadersExt, ToTypedHeader, UntypedHeader},
21 typed::{CSeq, Contact, Via},
22 Header, Param, Request, Response, SipMessage, StatusCode, StatusCodeKind,
23};
24use std::sync::{
25 atomic::{AtomicU32, Ordering},
26 Arc, Mutex,
27};
28use tokio::{
29 sync::mpsc::{UnboundedReceiver, UnboundedSender},
30 time::interval,
31};
32use tokio_util::sync::CancellationToken;
33use tracing::{debug, info, warn};
34
35#[derive(Clone)]
72pub enum DialogState {
73 Calling(DialogId),
74 Trying(DialogId),
75 Early(DialogId, rsip::Response),
76 WaitAck(DialogId, rsip::Response),
77 Confirmed(DialogId, rsip::Response),
78 Updated(DialogId, rsip::Request),
79 Notify(DialogId, rsip::Request),
80 Info(DialogId, rsip::Request),
81 Options(DialogId, rsip::Request),
82 Terminated(DialogId, TerminatedReason),
83}
84
85#[derive(Debug, Clone)]
86pub enum TerminatedReason {
87 Timeout,
88 UacCancel,
89 UacBye,
90 UasBye,
91 UacBusy,
92 UasBusy,
93 UasDecline,
94 ProxyError(rsip::StatusCode),
95 ProxyAuthRequired,
96 UacOther(rsip::StatusCode),
97 UasOther(rsip::StatusCode),
98}
99
100#[derive(Clone)]
128pub enum Dialog {
129 ServerInvite(ServerInviteDialog),
130 ClientInvite(ClientInviteDialog),
131}
132
133pub struct DialogInner {
166 pub role: TransactionRole,
167 pub cancel_token: CancellationToken,
168 pub id: Mutex<DialogId>,
169 pub state: Mutex<DialogState>,
170
171 pub local_seq: AtomicU32,
172 pub local_contact: Option<rsip::Uri>,
173 pub remote_contact: Mutex<Option<rsip::headers::untyped::Contact>>,
174
175 pub remote_seq: AtomicU32,
176 pub remote_uri: Mutex<rsip::Uri>,
177
178 pub from: rsip::typed::From,
179 pub to: Mutex<rsip::typed::To>,
180
181 pub credential: Option<Credential>,
182 pub route_set: Mutex<Vec<Route>>,
183 pub(super) endpoint_inner: EndpointInnerRef,
184 pub(super) state_sender: DialogStateSender,
185 pub(super) tu_sender: TransactionEventSender,
186 pub(super) initial_request: Request,
187 pub(super) initial_destination: Option<SipAddr>,
188}
189
190pub type DialogStateReceiver = UnboundedReceiver<DialogState>;
191pub type DialogStateSender = UnboundedSender<DialogState>;
192
193pub(super) type DialogInnerRef = Arc<DialogInner>;
194
195impl DialogState {
196 pub fn can_cancel(&self) -> bool {
197 matches!(
198 self,
199 DialogState::Calling(_) | DialogState::Trying(_) | DialogState::Early(_, _)
200 )
201 }
202 pub fn is_confirmed(&self) -> bool {
203 matches!(self, DialogState::Confirmed(_, _))
204 }
205 pub fn is_terminated(&self) -> bool {
206 matches!(self, DialogState::Terminated(_, _))
207 }
208}
209
210impl DialogInner {
211 pub fn new(
212 role: TransactionRole,
213 id: DialogId,
214 initial_request: Request,
215 endpoint_inner: EndpointInnerRef,
216 state_sender: DialogStateSender,
217 credential: Option<Credential>,
218 local_contact: Option<rsip::Uri>,
219 tu_sender: TransactionEventSender,
220 ) -> Result<Self> {
221 let cseq = initial_request.cseq_header()?.seq()?;
222
223 let remote_uri = match role {
224 TransactionRole::Client => initial_request.uri.clone(),
225 TransactionRole::Server => {
226 extract_uri_from_contact(initial_request.contact_header()?.value())?
227 }
228 };
229
230 let from = initial_request.from_header()?.typed()?;
231 let mut to = initial_request.to_header()?.typed()?;
232 if !to.params.iter().any(|p| matches!(p, Param::Tag(_))) {
233 to.params.push(rsip::Param::Tag(id.to_tag.clone().into()));
234 }
235
236 let mut route_set = vec![];
237 for h in initial_request.headers.iter() {
238 if let Header::RecordRoute(rr) = h {
239 route_set.push(Route::from(rr.value()));
240 }
241 }
242
243 Ok(Self {
244 role,
245 cancel_token: CancellationToken::new(),
246 id: Mutex::new(id.clone()),
247 from: from,
248 to: Mutex::new(to),
249 local_seq: AtomicU32::new(cseq),
250 remote_uri: Mutex::new(remote_uri),
251 remote_seq: AtomicU32::new(0),
252 credential,
253 route_set: Mutex::new(route_set),
254 endpoint_inner,
255 state_sender,
256 tu_sender,
257 state: Mutex::new(DialogState::Calling(id)),
258 initial_request,
259 initial_destination: None,
260 local_contact,
261 remote_contact: Mutex::new(None),
262 })
263 }
264 pub fn can_cancel(&self) -> bool {
265 self.state.lock().unwrap().can_cancel()
266 }
267 pub fn is_confirmed(&self) -> bool {
268 self.state.lock().unwrap().is_confirmed()
269 }
270 pub fn is_terminated(&self) -> bool {
271 self.state.lock().unwrap().is_terminated()
272 }
273 pub fn get_local_seq(&self) -> u32 {
274 self.local_seq.load(Ordering::Relaxed)
275 }
276 pub fn increment_local_seq(&self) -> u32 {
277 self.local_seq.fetch_add(1, Ordering::Relaxed);
278 self.local_seq.load(Ordering::Relaxed)
279 }
280
281 pub fn update_remote_tag(&self, tag: &str) -> Result<()> {
282 self.id.lock().unwrap().to_tag = tag.to_string();
283 let mut to = self.to.lock().unwrap();
284 *to = to.clone().with_tag(tag.into());
285 Ok(())
286 }
287
288 pub(super) fn build_vias_from_request(&self) -> Result<Vec<Via>> {
289 let mut vias = vec![];
290 for header in self.initial_request.headers.iter() {
291 if let Header::Via(via) = header {
292 if let Ok(mut typed_via) = via.typed() {
293 for param in typed_via.params.iter_mut() {
294 if let Param::Branch(_) = param {
295 *param = make_via_branch();
296 }
297 }
298 vias.push(typed_via);
299 return Ok(vias);
300 }
301 }
302 }
303 let via = self.endpoint_inner.get_via(None, None)?;
304 vias.push(via);
305 Ok(vias)
306 }
307
308 pub(super) fn make_request_with_vias(
309 &self,
310 method: rsip::Method,
311 cseq: Option<u32>,
312 vias: Vec<rsip::headers::typed::Via>,
313 headers: Option<Vec<rsip::Header>>,
314 body: Option<Vec<u8>>,
315 ) -> Result<rsip::Request> {
316 let mut headers = headers.unwrap_or_default();
317 let cseq_header = CSeq {
318 seq: cseq.unwrap_or_else(|| self.increment_local_seq()),
319 method,
320 };
321
322 for via in vias {
323 headers.push(Header::Via(via.into()));
324 }
325 headers.push(Header::CallId(
326 self.id.lock().unwrap().call_id.clone().into(),
327 ));
328
329 let to = self
330 .to
331 .lock()
332 .unwrap()
333 .clone()
334 .untyped()
335 .value()
336 .to_string();
337
338 let from = self.from.clone().untyped().value().to_string();
339 match self.role {
340 TransactionRole::Client => {
341 headers.push(Header::From(from.into()));
342 headers.push(Header::To(to.into()));
343 }
344 TransactionRole::Server => {
345 headers.push(Header::From(to.into()));
346 headers.push(Header::To(from.into()));
347 }
348 }
349 headers.push(Header::CSeq(cseq_header.into()));
350 headers.push(Header::UserAgent(
351 self.endpoint_inner.user_agent.clone().into(),
352 ));
353
354 self.local_contact
355 .as_ref()
356 .map(|c| headers.push(Contact::from(c.clone()).into()));
357
358 {
359 let route_set = self.route_set.lock().unwrap();
360 headers.extend(route_set.iter().cloned().map(Header::Route));
361 }
362 headers.push(Header::MaxForwards(70.into()));
363
364 body.as_ref().map(|b| {
365 headers.push(Header::ContentLength((b.len() as u32).into()));
366 });
367
368 let req = rsip::Request {
369 method,
370 uri: self.remote_uri.lock().unwrap().clone(),
371 headers: headers.into(),
372 body: body.unwrap_or_default(),
373 version: rsip::Version::V2,
374 };
375 Ok(req)
376 }
377
378 pub(super) fn make_request(
379 &self,
380 method: rsip::Method,
381 cseq: Option<u32>,
382 addr: Option<crate::transport::SipAddr>,
383 branch: Option<Param>,
384 headers: Option<Vec<rsip::Header>>,
385 body: Option<Vec<u8>>,
386 ) -> Result<rsip::Request> {
387 let via = self.endpoint_inner.get_via(addr, branch)?;
388 self.make_request_with_vias(method, cseq, vec![via], headers, body)
389 }
390
391 pub(super) fn make_response(
392 &self,
393 request: &Request,
394 status: StatusCode,
395 headers: Option<Vec<rsip::Header>>,
396 body: Option<Vec<u8>>,
397 ) -> rsip::Response {
398 let mut resp_headers = rsip::Headers::default();
399
400 for header in request.headers.iter() {
401 match header {
402 Header::Via(via) => {
403 resp_headers.push(Header::Via(via.clone()));
404 }
405 Header::From(from) => {
406 resp_headers.push(Header::From(from.clone()));
407 }
408 Header::To(to) => {
409 let mut to = match to.clone().typed() {
410 Ok(to) => to,
411 Err(e) => {
412 info!("error parsing to header {}", e);
413 continue;
414 }
415 };
416
417 if status != StatusCode::Trying
418 && !to.params.iter().any(|p| matches!(p, Param::Tag(_)))
419 {
420 to.params.push(rsip::Param::Tag(
421 self.id.lock().unwrap().to_tag.clone().into(),
422 ));
423 }
424 resp_headers.push(Header::To(to.into()));
425 }
426 Header::CSeq(cseq) => {
427 resp_headers.push(Header::CSeq(cseq.clone()));
428 }
429 Header::CallId(call_id) => {
430 resp_headers.push(Header::CallId(call_id.clone()));
431 }
432 Header::RecordRoute(rr) => {
433 resp_headers.push(Header::RecordRoute(rr.clone()));
435 }
436 _ => {}
437 }
438 }
439
440 if let Some(headers) = headers {
441 for header in headers {
442 resp_headers.unique_push(header);
443 }
444 }
445
446 resp_headers.retain(|h| {
447 !matches!(
448 h,
449 Header::Contact(_) | Header::ContentLength(_) | Header::UserAgent(_)
450 )
451 });
452
453 self.local_contact
454 .as_ref()
455 .map(|c| resp_headers.push(Contact::from(c.clone()).into()));
456
457 body.as_ref().map(|b| {
458 resp_headers.push(Header::ContentLength((b.len() as u32).into()));
459 });
460
461 resp_headers.push(Header::UserAgent(
462 self.endpoint_inner.user_agent.clone().into(),
463 ));
464
465 Response {
466 status_code: status,
467 headers: resp_headers,
468 body: body.unwrap_or_default(),
469 version: request.version().clone(),
470 }
471 }
472
473 pub(super) async fn do_request(&self, request: Request) -> Result<Option<rsip::Response>> {
474 let method = request.method().to_owned();
475 let destination = self
476 .remote_contact
477 .lock()
478 .unwrap()
479 .as_ref()
480 .and_then(|c| c.uri().ok().as_ref()?.try_into().ok())
481 .or_else(|| self.initial_destination.clone());
482
483 let key = TransactionKey::from_request(&request, TransactionRole::Client)?;
484 let mut tx = Transaction::new_client(key, request, self.endpoint_inner.clone(), None);
485 tx.destination = destination;
486
487 match tx.send().await {
488 Ok(_) => {
489 info!(
490 id = self.id.lock().unwrap().to_string(),
491 method = %method,
492 destination=tx.destination.as_ref().map(|d| d.to_string()).as_deref(),
493 key=%tx.key,
494 "request sent done",
495 );
496 }
497 Err(e) => {
498 warn!(
499 id = self.id.lock().unwrap().to_string(),
500 destination = tx.destination.as_ref().map(|d| d.to_string()).as_deref(),
501 "failed to send request error: {}\n{}",
502 e,
503 tx.original
504 );
505 return Err(e);
506 }
507 }
508 let mut auth_sent = false;
509 while let Some(msg) = tx.receive().await {
510 match msg {
511 SipMessage::Response(resp) => match resp.status_code {
512 StatusCode::Trying => {
513 continue;
514 }
515 StatusCode::Ringing | StatusCode::SessionProgress => {
516 self.transition(DialogState::Early(self.id.lock().unwrap().clone(), resp))?;
517 continue;
518 }
519 StatusCode::ProxyAuthenticationRequired | StatusCode::Unauthorized => {
520 let id = self.id.lock().unwrap().clone();
521 if auth_sent {
522 info!(
523 id = self.id.lock().unwrap().to_string(),
524 "received {} response after auth sent", resp.status_code
525 );
526 self.transition(DialogState::Terminated(
527 id,
528 TerminatedReason::ProxyAuthRequired,
529 ))?;
530 break;
531 }
532 auth_sent = true;
533 if let Some(cred) = &self.credential {
534 let new_seq = match method {
535 rsip::Method::Cancel => self.get_local_seq(),
536 _ => self.increment_local_seq(),
537 };
538 tx = handle_client_authenticate(new_seq, tx, resp, cred).await?;
539 tx.send().await?;
540 continue;
541 } else {
542 info!(
543 id = self.id.lock().unwrap().to_string(),
544 "received 407 response without auth option"
545 );
546 self.transition(DialogState::Terminated(
547 id,
548 TerminatedReason::ProxyAuthRequired,
549 ))?;
550 }
551 }
552 _ => {
553 debug!(
554 id = self.id.lock().unwrap().to_string(),
555 method = %method,
556 "dialog do_request done: {:?}", resp.status_code
557 );
558 return Ok(Some(resp));
559 }
560 },
561 _ => break,
562 }
563 }
564 Ok(None)
565 }
566
567 pub(super) fn transition(&self, state: DialogState) -> Result<()> {
568 self.state_sender.send(state.clone()).ok();
570
571 match state {
572 DialogState::Updated(_, _)
573 | DialogState::Notify(_, _)
574 | DialogState::Info(_, _)
575 | DialogState::Options(_, _) => {
576 return Ok(());
577 }
578 _ => {}
579 }
580 let mut old_state = self.state.lock().unwrap();
581 match (&*old_state, &state) {
582 (DialogState::Terminated(id, _), _) => {
583 warn!(
584 %id,
585 "dialog already terminated, ignoring transition to {}", state
586 );
587 return Ok(());
588 }
589 _ => {}
590 }
591 debug!("transitioning state: {} -> {}", old_state, state);
592 *old_state = state;
593 Ok(())
594 }
595
596 pub(super) fn serve_keepalive_options(dlg_inner: Arc<Self>) {
597 let keepalive = match dlg_inner.endpoint_inner.option.dialog_keepalive_duration {
598 Some(k) => k,
599 None => return,
600 };
601 let token = dlg_inner.cancel_token.child_token();
602 let dlg_ref = dlg_inner.clone();
603
604 tokio::spawn(async move {
605 let mut ticker = interval(keepalive);
606 ticker.tick().await;
608 let keepalive_loop = async {
609 loop {
610 ticker.tick().await;
611 if !dlg_ref.is_confirmed() {
612 return Ok(());
613 }
614 let options = dlg_ref.make_request(
615 rsip::Method::Options,
616 None,
617 None,
618 None,
619 None,
620 None,
621 )?;
622 let id = dlg_ref.id.lock().unwrap().clone();
623 match dlg_ref.do_request(options).await {
624 Ok(Some(resp)) => match resp.status_code.kind() {
625 StatusCodeKind::Provisional | StatusCodeKind::Successful => {
626 continue;
627 }
628 _ => {
629 info!(%id, status = %resp.status_code, "keepalive options failed");
630 }
631 },
632 Ok(None) => {
633 continue;
634 }
635 Err(_) => {}
636 }
637 dlg_ref
638 .transition(DialogState::Terminated(id, TerminatedReason::Timeout))
639 .ok();
640 break;
641 }
642 Ok::<(), crate::Error>(())
643 };
644 tokio::select! {
645 _ = token.cancelled() => {}
646 _ = keepalive_loop =>{}
647 };
648 });
649 }
650}
651
652impl std::fmt::Display for DialogState {
653 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
654 match self {
655 DialogState::Calling(id) => write!(f, "{}(Calling)", id),
656 DialogState::Trying(id) => write!(f, "{}(Trying)", id),
657 DialogState::Early(id, _) => write!(f, "{}(Early)", id),
658 DialogState::WaitAck(id, _) => write!(f, "{}(WaitAck)", id),
659 DialogState::Confirmed(id, _) => write!(f, "{}(Confirmed)", id),
660 DialogState::Updated(id, _) => write!(f, "{}(Updated)", id),
661 DialogState::Notify(id, _) => write!(f, "{}(Notify)", id),
662 DialogState::Info(id, _) => write!(f, "{}(Info)", id),
663 DialogState::Options(id, _) => write!(f, "{}(Options)", id),
664 DialogState::Terminated(id, reason) => write!(f, "{}(Terminated {:?})", id, reason),
665 }
666 }
667}
668
669impl Dialog {
670 pub fn id(&self) -> DialogId {
671 match self {
672 Dialog::ServerInvite(d) => d.inner.id.lock().unwrap().clone(),
673 Dialog::ClientInvite(d) => d.inner.id.lock().unwrap().clone(),
674 }
675 }
676
677 pub fn from(&self) -> &rsip::typed::From {
678 match self {
679 Dialog::ServerInvite(d) => &d.inner.from,
680 Dialog::ClientInvite(d) => &d.inner.from,
681 }
682 }
683
684 pub fn to(&self) -> rsip::typed::To {
685 match self {
686 Dialog::ServerInvite(d) => d.inner.to.lock().unwrap().clone(),
687 Dialog::ClientInvite(d) => d.inner.to.lock().unwrap().clone(),
688 }
689 }
690 pub fn remote_contact(&self) -> Option<rsip::Uri> {
691 match self {
692 Dialog::ServerInvite(d) => d
693 .inner
694 .remote_contact
695 .lock()
696 .unwrap()
697 .as_ref()
698 .map(|c| c.uri().ok())
699 .flatten(),
700 Dialog::ClientInvite(d) => d
701 .inner
702 .remote_contact
703 .lock()
704 .unwrap()
705 .as_ref()
706 .map(|c| c.uri().ok())
707 .flatten(),
708 }
709 }
710
711 pub async fn handle(&mut self, tx: &mut Transaction) -> Result<()> {
712 match self {
713 Dialog::ServerInvite(d) => d.handle(tx).await,
714 Dialog::ClientInvite(d) => d.handle(tx).await,
715 }
716 }
717 pub fn on_remove(&self) {
718 match self {
719 Dialog::ServerInvite(d) => {
720 d.inner.cancel_token.cancel();
721 }
722 Dialog::ClientInvite(d) => {
723 d.inner.cancel_token.cancel();
724 }
725 }
726 }
727
728 pub async fn hangup(&self) -> Result<()> {
729 match self {
730 Dialog::ServerInvite(d) => d.bye().await,
731 Dialog::ClientInvite(d) => d.hangup().await,
732 }
733 }
734
735 pub fn can_cancel(&self) -> bool {
736 match self {
737 Dialog::ServerInvite(d) => d.inner.can_cancel(),
738 Dialog::ClientInvite(d) => d.inner.can_cancel(),
739 }
740 }
741}