1use super::{
2 authenticate::{handle_client_authenticate, Credential},
3 client_dialog::ClientInviteDialog,
4 server_dialog::ServerInviteDialog,
5 DialogId,
6};
7use crate::{
8 rsip,
9 rsip_ext::extract_uri_from_contact,
10 transaction::{
11 endpoint::EndpointInnerRef,
12 key::{TransactionKey, TransactionRole},
13 make_via_branch,
14 transaction::{Transaction, TransactionEventSender},
15 },
16 transport::SipAddr,
17 Result,
18};
19use rsip::{
20 headers::Route,
21 prelude::{HeadersExt, ToTypedHeader, UntypedHeader},
22 typed::{CSeq, Contact, Via},
23 Header, Param, Request, Response, SipMessage, StatusCode, StatusCodeKind,
24};
25use std::sync::{
26 atomic::{AtomicU32, Ordering},
27 Arc, Mutex,
28};
29use tokio::{
30 sync::mpsc::{UnboundedReceiver, UnboundedSender},
31 time::interval,
32};
33use tokio_util::sync::CancellationToken;
34use tracing::{debug, info, warn};
35
36#[derive(Clone)]
73pub enum DialogState {
74 Calling(DialogId),
75 Trying(DialogId),
76 Early(DialogId, rsip::Response),
77 WaitAck(DialogId, rsip::Response),
78 Confirmed(DialogId, rsip::Response),
79 Updated(DialogId, rsip::Request),
80 Notify(DialogId, rsip::Request),
81 Info(DialogId, rsip::Request),
82 Options(DialogId, rsip::Request),
83 Terminated(DialogId, TerminatedReason),
84}
85
86#[derive(Debug, Clone)]
87pub enum TerminatedReason {
88 Timeout,
89 UacCancel,
90 UacBye,
91 UasBye,
92 UacBusy,
93 UasBusy,
94 UasDecline,
95 ProxyError(rsip::StatusCode),
96 ProxyAuthRequired,
97 UacOther(rsip::StatusCode),
98 UasOther(rsip::StatusCode),
99}
100
101#[derive(Clone)]
129pub enum Dialog {
130 ServerInvite(ServerInviteDialog),
131 ClientInvite(ClientInviteDialog),
132}
133
134pub struct DialogInner {
167 pub role: TransactionRole,
168 pub cancel_token: CancellationToken,
169 pub id: Mutex<DialogId>,
170 pub state: Mutex<DialogState>,
171
172 pub local_seq: AtomicU32,
173 pub local_contact: Option<rsip::Uri>,
174 pub remote_contact: Mutex<Option<rsip::headers::untyped::Contact>>,
175
176 pub remote_seq: AtomicU32,
177 pub remote_uri: Mutex<rsip::Uri>,
178
179 pub from: rsip::typed::From,
180 pub to: Mutex<rsip::typed::To>,
181
182 pub credential: Option<Credential>,
183 pub route_set: Mutex<Vec<Route>>,
184 pub(super) endpoint_inner: EndpointInnerRef,
185 pub(super) state_sender: DialogStateSender,
186 pub(super) tu_sender: TransactionEventSender,
187 pub(super) initial_request: Request,
188 pub(super) initial_destination: Option<SipAddr>,
189}
190
191pub type DialogStateReceiver = UnboundedReceiver<DialogState>;
192pub type DialogStateSender = UnboundedSender<DialogState>;
193
194pub(super) type DialogInnerRef = Arc<DialogInner>;
195
196impl DialogState {
197 pub fn can_cancel(&self) -> bool {
198 matches!(
199 self,
200 DialogState::Calling(_) | DialogState::Trying(_) | DialogState::Early(_, _)
201 )
202 }
203 pub fn is_confirmed(&self) -> bool {
204 matches!(self, DialogState::Confirmed(_, _))
205 }
206 pub fn is_terminated(&self) -> bool {
207 matches!(self, DialogState::Terminated(_, _))
208 }
209}
210
211impl DialogInner {
212 pub fn new(
213 role: TransactionRole,
214 id: DialogId,
215 initial_request: Request,
216 endpoint_inner: EndpointInnerRef,
217 state_sender: DialogStateSender,
218 credential: Option<Credential>,
219 local_contact: Option<rsip::Uri>,
220 tu_sender: TransactionEventSender,
221 ) -> Result<Self> {
222 let cseq = initial_request.cseq_header()?.seq()?;
223
224 let remote_uri = match role {
225 TransactionRole::Client => initial_request.uri.clone(),
226 TransactionRole::Server => {
227 extract_uri_from_contact(initial_request.contact_header()?.value())?
228 }
229 };
230
231 let from = initial_request.from_header()?.typed()?;
232 let mut to = initial_request.to_header()?.typed()?;
233 if !to.params.iter().any(|p| matches!(p, Param::Tag(_))) {
234 to.params.push(rsip::Param::Tag(id.to_tag.clone().into()));
235 }
236
237 let mut route_set = vec![];
238 if endpoint_inner.option.follow_record_route {
239 for h in initial_request.headers.iter() {
240 if let Header::RecordRoute(rr) = h {
241 route_set.push(Route::from(rr.value()));
242 }
243 }
244 }
245
246 Ok(Self {
247 role,
248 cancel_token: CancellationToken::new(),
249 id: Mutex::new(id.clone()),
250 from: from,
251 to: Mutex::new(to),
252 local_seq: AtomicU32::new(cseq),
253 remote_uri: Mutex::new(remote_uri),
254 remote_seq: AtomicU32::new(0),
255 credential,
256 route_set: Mutex::new(route_set),
257 endpoint_inner,
258 state_sender,
259 tu_sender,
260 state: Mutex::new(DialogState::Calling(id)),
261 initial_request,
262 initial_destination: None,
263 local_contact,
264 remote_contact: Mutex::new(None),
265 })
266 }
267 pub fn can_cancel(&self) -> bool {
268 self.state.lock().unwrap().can_cancel()
269 }
270 pub fn is_confirmed(&self) -> bool {
271 self.state.lock().unwrap().is_confirmed()
272 }
273 pub fn is_terminated(&self) -> bool {
274 self.state.lock().unwrap().is_terminated()
275 }
276 pub fn get_local_seq(&self) -> u32 {
277 self.local_seq.load(Ordering::Relaxed)
278 }
279 pub fn increment_local_seq(&self) -> u32 {
280 self.local_seq.fetch_add(1, Ordering::Relaxed);
281 self.local_seq.load(Ordering::Relaxed)
282 }
283
284 pub fn update_remote_tag(&self, tag: &str) -> Result<()> {
285 self.id.lock().unwrap().to_tag = tag.to_string();
286 let mut to = self.to.lock().unwrap();
287 *to = to.clone().with_tag(tag.into());
288 Ok(())
289 }
290
291 pub(super) fn build_vias_from_request(&self) -> Result<Vec<Via>> {
292 let mut vias = vec![];
293 for header in self.initial_request.headers.iter() {
294 if let Header::Via(via) = header {
295 if let Ok(mut typed_via) = via.typed() {
296 for param in typed_via.params.iter_mut() {
297 if let Param::Branch(_) = param {
298 *param = make_via_branch();
299 }
300 }
301 vias.push(typed_via);
302 return Ok(vias);
303 }
304 }
305 }
306 let via = self.endpoint_inner.get_via(None, None)?;
307 vias.push(via);
308 Ok(vias)
309 }
310
311 pub(super) fn make_request_with_vias(
312 &self,
313 method: rsip::Method,
314 cseq: Option<u32>,
315 vias: Vec<rsip::headers::typed::Via>,
316 headers: Option<Vec<rsip::Header>>,
317 body: Option<Vec<u8>>,
318 ) -> Result<rsip::Request> {
319 let mut headers = headers.unwrap_or_default();
320 let cseq_header = CSeq {
321 seq: cseq.unwrap_or_else(|| self.increment_local_seq()),
322 method,
323 };
324
325 for via in vias {
326 headers.push(Header::Via(via.into()));
327 }
328 headers.push(Header::CallId(
329 self.id.lock().unwrap().call_id.clone().into(),
330 ));
331
332 let to = self
333 .to
334 .lock()
335 .unwrap()
336 .clone()
337 .untyped()
338 .value()
339 .to_string();
340
341 let from = self.from.clone().untyped().value().to_string();
342 match self.role {
343 TransactionRole::Client => {
344 headers.push(Header::From(from.into()));
345 headers.push(Header::To(to.into()));
346 }
347 TransactionRole::Server => {
348 headers.push(Header::From(to.into()));
349 headers.push(Header::To(from.into()));
350 }
351 }
352 headers.push(Header::CSeq(cseq_header.into()));
353 headers.push(Header::UserAgent(
354 self.endpoint_inner.user_agent.clone().into(),
355 ));
356
357 self.local_contact
358 .as_ref()
359 .map(|c| headers.push(Contact::from(c.clone()).into()));
360
361 if self.endpoint_inner.option.follow_record_route {
362 let route_set = self.route_set.lock().unwrap();
363 headers.extend(route_set.iter().cloned().map(Header::Route));
364 }
365 headers.push(Header::MaxForwards(70.into()));
366
367 body.as_ref().map(|b| {
368 headers.push(Header::ContentLength((b.len() as u32).into()));
369 });
370
371 let req = rsip::Request {
372 method,
373 uri: self.remote_uri.lock().unwrap().clone(),
374 headers: headers.into(),
375 body: body.unwrap_or_default(),
376 version: rsip::Version::V2,
377 };
378 Ok(req)
379 }
380
381 pub(super) fn make_request(
382 &self,
383 method: rsip::Method,
384 cseq: Option<u32>,
385 addr: Option<crate::transport::SipAddr>,
386 branch: Option<Param>,
387 headers: Option<Vec<rsip::Header>>,
388 body: Option<Vec<u8>>,
389 ) -> Result<rsip::Request> {
390 let via = self.endpoint_inner.get_via(addr, branch)?;
391 self.make_request_with_vias(method, cseq, vec![via], headers, body)
392 }
393
394 pub(super) fn make_response(
395 &self,
396 request: &Request,
397 status: StatusCode,
398 headers: Option<Vec<rsip::Header>>,
399 body: Option<Vec<u8>>,
400 ) -> rsip::Response {
401 let mut resp_headers = rsip::Headers::default();
402
403 for header in request.headers.iter() {
404 match header {
405 Header::Via(via) => {
406 resp_headers.push(Header::Via(via.clone()));
407 }
408 Header::From(from) => {
409 resp_headers.push(Header::From(from.clone()));
410 }
411 Header::To(to) => {
412 let mut to = match to.clone().typed() {
413 Ok(to) => to,
414 Err(e) => {
415 info!("error parsing to header {}", e);
416 continue;
417 }
418 };
419
420 if status != StatusCode::Trying
421 && !to.params.iter().any(|p| matches!(p, Param::Tag(_)))
422 {
423 to.params.push(rsip::Param::Tag(
424 self.id.lock().unwrap().to_tag.clone().into(),
425 ));
426 }
427 resp_headers.push(Header::To(to.into()));
428 }
429 Header::CSeq(cseq) => {
430 resp_headers.push(Header::CSeq(cseq.clone()));
431 }
432 Header::CallId(call_id) => {
433 resp_headers.push(Header::CallId(call_id.clone()));
434 }
435 Header::RecordRoute(rr) => {
436 resp_headers.push(Header::RecordRoute(rr.clone()));
438 }
439 _ => {}
440 }
441 }
442
443 if let Some(headers) = headers {
444 for header in headers {
445 resp_headers.unique_push(header);
446 }
447 }
448
449 resp_headers.retain(|h| {
450 !matches!(
451 h,
452 Header::Contact(_) | Header::ContentLength(_) | Header::UserAgent(_)
453 )
454 });
455
456 self.local_contact
457 .as_ref()
458 .map(|c| resp_headers.push(Contact::from(c.clone()).into()));
459
460 body.as_ref().map(|b| {
461 resp_headers.push(Header::ContentLength((b.len() as u32).into()));
462 });
463
464 resp_headers.push(Header::UserAgent(
465 self.endpoint_inner.user_agent.clone().into(),
466 ));
467
468 Response {
469 status_code: status,
470 headers: resp_headers,
471 body: body.unwrap_or_default(),
472 version: request.version().clone(),
473 }
474 }
475
476 pub(super) async fn do_request(&self, request: Request) -> Result<Option<rsip::Response>> {
477 let method = request.method().to_owned();
478 let destination = self
479 .remote_contact
480 .lock()
481 .unwrap()
482 .as_ref()
483 .and_then(|c| c.uri().ok().as_ref()?.try_into().ok())
484 .or_else(|| self.initial_destination.clone());
485
486 let key = TransactionKey::from_request(&request, TransactionRole::Client)?;
487 let mut tx = Transaction::new_client(key, request, self.endpoint_inner.clone(), None);
488 tx.destination = destination;
489
490 match tx.send().await {
491 Ok(_) => {
492 info!(
493 id = self.id.lock().unwrap().to_string(),
494 method = %method,
495 destination=tx.destination.as_ref().map(|d| d.to_string()).as_deref(),
496 key=%tx.key,
497 "request sent done",
498 );
499 }
500 Err(e) => {
501 warn!(
502 id = self.id.lock().unwrap().to_string(),
503 destination = tx.destination.as_ref().map(|d| d.to_string()).as_deref(),
504 "failed to send request error: {}\n{}",
505 e,
506 tx.original
507 );
508 return Err(e);
509 }
510 }
511 let mut auth_sent = false;
512 while let Some(msg) = tx.receive().await {
513 match msg {
514 SipMessage::Response(resp) => match resp.status_code {
515 StatusCode::Trying => {
516 continue;
517 }
518 StatusCode::Ringing | StatusCode::SessionProgress => {
519 self.transition(DialogState::Early(self.id.lock().unwrap().clone(), resp))?;
520 continue;
521 }
522 StatusCode::ProxyAuthenticationRequired | StatusCode::Unauthorized => {
523 let id = self.id.lock().unwrap().clone();
524 if auth_sent {
525 info!(
526 id = self.id.lock().unwrap().to_string(),
527 "received {} response after auth sent", resp.status_code
528 );
529 self.transition(DialogState::Terminated(
530 id,
531 TerminatedReason::ProxyAuthRequired,
532 ))?;
533 break;
534 }
535 auth_sent = true;
536 if let Some(cred) = &self.credential {
537 let new_seq = match method {
538 rsip::Method::Cancel => self.get_local_seq(),
539 _ => self.increment_local_seq(),
540 };
541 tx = handle_client_authenticate(new_seq, tx, resp, cred).await?;
542 tx.send().await?;
543 continue;
544 } else {
545 info!(
546 id = self.id.lock().unwrap().to_string(),
547 "received 407 response without auth option"
548 );
549 self.transition(DialogState::Terminated(
550 id,
551 TerminatedReason::ProxyAuthRequired,
552 ))?;
553 }
554 }
555 _ => {
556 debug!(
557 id = self.id.lock().unwrap().to_string(),
558 method = %method,
559 "dialog do_request done: {:?}", resp.status_code
560 );
561 return Ok(Some(resp));
562 }
563 },
564 _ => break,
565 }
566 }
567 Ok(None)
568 }
569
570 pub(super) fn transition(&self, state: DialogState) -> Result<()> {
571 self.state_sender.send(state.clone()).ok();
573
574 match state {
575 DialogState::Updated(_, _)
576 | DialogState::Notify(_, _)
577 | DialogState::Info(_, _)
578 | DialogState::Options(_, _) => {
579 return Ok(());
580 }
581 _ => {}
582 }
583 let mut old_state = self.state.lock().unwrap();
584 match (&*old_state, &state) {
585 (DialogState::Terminated(id, _), _) => {
586 warn!(
587 %id,
588 "dialog already terminated, ignoring transition to {}", state
589 );
590 return Ok(());
591 }
592 _ => {}
593 }
594 debug!("transitioning state: {} -> {}", old_state, state);
595 *old_state = state;
596 Ok(())
597 }
598
599 pub(super) fn serve_keepalive_options(dlg_inner: Arc<Self>) {
600 let keepalive = match dlg_inner.endpoint_inner.option.dialog_keepalive_duration {
601 Some(k) => k,
602 None => return,
603 };
604 let token = dlg_inner.cancel_token.child_token();
605 let dlg_ref = dlg_inner.clone();
606
607 tokio::spawn(async move {
608 let mut ticker = interval(keepalive);
609 ticker.tick().await;
611 let keepalive_loop = async {
612 loop {
613 ticker.tick().await;
614 if !dlg_ref.is_confirmed() {
615 return Ok(());
616 }
617 let options = dlg_ref.make_request(
618 rsip::Method::Options,
619 None,
620 None,
621 None,
622 None,
623 None,
624 )?;
625 let id = dlg_ref.id.lock().unwrap().clone();
626 match dlg_ref.do_request(options).await {
627 Ok(Some(resp)) => match resp.status_code.kind() {
628 StatusCodeKind::Provisional | StatusCodeKind::Successful => {
629 continue;
630 }
631 _ => {
632 info!(%id, status = %resp.status_code, "keepalive options failed");
633 }
634 },
635 Ok(None) => {
636 continue;
637 }
638 Err(_) => {}
639 }
640 dlg_ref
641 .transition(DialogState::Terminated(id, TerminatedReason::Timeout))
642 .ok();
643 break;
644 }
645 Ok::<(), crate::Error>(())
646 };
647 tokio::select! {
648 _ = token.cancelled() => {}
649 _ = keepalive_loop =>{}
650 };
651 });
652 }
653}
654
655impl std::fmt::Display for DialogState {
656 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
657 match self {
658 DialogState::Calling(id) => write!(f, "{}(Calling)", id),
659 DialogState::Trying(id) => write!(f, "{}(Trying)", id),
660 DialogState::Early(id, _) => write!(f, "{}(Early)", id),
661 DialogState::WaitAck(id, _) => write!(f, "{}(WaitAck)", id),
662 DialogState::Confirmed(id, _) => write!(f, "{}(Confirmed)", id),
663 DialogState::Updated(id, _) => write!(f, "{}(Updated)", id),
664 DialogState::Notify(id, _) => write!(f, "{}(Notify)", id),
665 DialogState::Info(id, _) => write!(f, "{}(Info)", id),
666 DialogState::Options(id, _) => write!(f, "{}(Options)", id),
667 DialogState::Terminated(id, reason) => write!(f, "{}(Terminated {:?})", id, reason),
668 }
669 }
670}
671
672impl Dialog {
673 pub fn id(&self) -> DialogId {
674 match self {
675 Dialog::ServerInvite(d) => d.inner.id.lock().unwrap().clone(),
676 Dialog::ClientInvite(d) => d.inner.id.lock().unwrap().clone(),
677 }
678 }
679
680 pub fn from(&self) -> &rsip::typed::From {
681 match self {
682 Dialog::ServerInvite(d) => &d.inner.from,
683 Dialog::ClientInvite(d) => &d.inner.from,
684 }
685 }
686
687 pub fn to(&self) -> rsip::typed::To {
688 match self {
689 Dialog::ServerInvite(d) => d.inner.to.lock().unwrap().clone(),
690 Dialog::ClientInvite(d) => d.inner.to.lock().unwrap().clone(),
691 }
692 }
693 pub fn remote_contact(&self) -> Option<rsip::Uri> {
694 match self {
695 Dialog::ServerInvite(d) => d
696 .inner
697 .remote_contact
698 .lock()
699 .unwrap()
700 .as_ref()
701 .map(|c| c.uri().ok())
702 .flatten(),
703 Dialog::ClientInvite(d) => d
704 .inner
705 .remote_contact
706 .lock()
707 .unwrap()
708 .as_ref()
709 .map(|c| c.uri().ok())
710 .flatten(),
711 }
712 }
713
714 pub async fn handle(&mut self, tx: &mut Transaction) -> Result<()> {
715 match self {
716 Dialog::ServerInvite(d) => d.handle(tx).await,
717 Dialog::ClientInvite(d) => d.handle(tx).await,
718 }
719 }
720 pub fn on_remove(&self) {
721 match self {
722 Dialog::ServerInvite(d) => {
723 d.inner.cancel_token.cancel();
724 }
725 Dialog::ClientInvite(d) => {
726 d.inner.cancel_token.cancel();
727 }
728 }
729 }
730
731 pub async fn hangup(&self) -> Result<()> {
732 match self {
733 Dialog::ServerInvite(d) => d.bye().await,
734 Dialog::ClientInvite(d) => d.hangup().await,
735 }
736 }
737
738 pub fn can_cancel(&self) -> bool {
739 match self {
740 Dialog::ServerInvite(d) => d.inner.can_cancel(),
741 Dialog::ClientInvite(d) => d.inner.can_cancel(),
742 }
743 }
744}