ftth_rsipstack/dialog/
dialog.rs

1use super::{
2    authenticate::{handle_client_authenticate, Credential},
3    client_dialog::ClientInviteDialog,
4    server_dialog::ServerInviteDialog,
5    DialogId,
6};
7use crate::{
8    rsip_ext::extract_uri_from_contact,
9    transaction::{
10        endpoint::EndpointInnerRef,
11        key::{TransactionKey, TransactionRole},
12        make_via_branch,
13        transaction::{Transaction, TransactionEventSender},
14    },
15    transport::SipAddr,
16    Result,
17};
18use rsip::{
19    headers::Route,
20    prelude::{HeadersExt, ToTypedHeader, UntypedHeader},
21    typed::{CSeq, Contact, Via},
22    Header, Param, Request, Response, SipMessage, StatusCode, StatusCodeKind,
23};
24use std::sync::{
25    atomic::{AtomicU32, Ordering},
26    Arc, Mutex,
27};
28use tokio::{
29    sync::mpsc::{UnboundedReceiver, UnboundedSender},
30    time::interval,
31};
32use tokio_util::sync::CancellationToken;
33use tracing::{debug, info, warn};
34
35/// SIP Dialog State
36///
37/// Represents the various states a SIP dialog can be in during its lifecycle.
38/// These states follow the SIP dialog state machine as defined in RFC 3261.
39///
40/// # States
41///
42/// * `Calling` - Initial state when a dialog is created for an outgoing INVITE
43/// * `Trying` - Dialog has received a 100 Trying response
44/// * `Early` - Dialog is in early state (1xx response received, except 100)
45/// * `WaitAck` - Server dialog waiting for ACK after sending 2xx response
46/// * `Confirmed` - Dialog is established and confirmed (2xx response received/sent and ACK sent/received)
47/// * `Updated` - Dialog received an UPDATE request
48/// * `Notify` - Dialog received a NOTIFY request  
49/// * `Info` - Dialog received an INFO request
50/// * `Options` - Dialog received an OPTIONS request
51/// * `Terminated` - Dialog has been terminated
52///
53/// # Examples
54///
55/// ```rust,no_run
56/// use rsipstack::dialog::dialog::DialogState;
57/// use rsipstack::dialog::DialogId;
58///
59/// # fn example() {
60/// # let dialog_id = DialogId {
61/// #     call_id: "test@example.com".to_string(),
62/// #     from_tag: "from-tag".to_string(),
63/// #     to_tag: "to-tag".to_string(),
64/// # };
65/// let state = DialogState::Confirmed(dialog_id, rsip::Response::default());
66/// if state.is_confirmed() {
67///     println!("Dialog is established");
68/// }
69/// # }
70/// ```
71#[derive(Clone)]
72pub enum DialogState {
73    Calling(DialogId),
74    Trying(DialogId),
75    Early(DialogId, rsip::Response),
76    WaitAck(DialogId, rsip::Response),
77    Confirmed(DialogId, rsip::Response),
78    Updated(DialogId, rsip::Request),
79    Notify(DialogId, rsip::Request),
80    Info(DialogId, rsip::Request),
81    Options(DialogId, rsip::Request),
82    Terminated(DialogId, TerminatedReason),
83}
84
85#[derive(Debug, Clone)]
86pub enum TerminatedReason {
87    Timeout,
88    UacCancel,
89    UacBye,
90    UasBye,
91    UacBusy,
92    UasBusy,
93    UasDecline,
94    ProxyError(rsip::StatusCode),
95    ProxyAuthRequired,
96    UacOther(rsip::StatusCode),
97    UasOther(rsip::StatusCode),
98}
99
100/// SIP Dialog
101///
102/// Represents a SIP dialog which can be either a server-side or client-side INVITE dialog.
103/// A dialog is a peer-to-peer SIP relationship between two user agents that persists
104/// for some time. Dialogs are established by SIP methods like INVITE.
105///
106/// # Variants
107///
108/// * `ServerInvite` - Server-side INVITE dialog (UAS)
109/// * `ClientInvite` - Client-side INVITE dialog (UAC)
110///
111/// # Examples
112///
113/// ```rust,no_run
114/// use rsipstack::dialog::dialog::Dialog;
115///
116/// # fn handle_dialog(dialog: Dialog) {
117/// match dialog {
118///     Dialog::ServerInvite(server_dialog) => {
119///         // Handle server dialog
120///     },
121///     Dialog::ClientInvite(client_dialog) => {
122///         // Handle client dialog  
123///     }
124/// }
125/// # }
126/// ```
127#[derive(Clone)]
128pub enum Dialog {
129    ServerInvite(ServerInviteDialog),
130    ClientInvite(ClientInviteDialog),
131}
132
133/// Internal Dialog State and Management
134///
135/// `DialogInner` contains the core state and functionality shared between
136/// client and server dialogs. It manages dialog state transitions, sequence numbers,
137/// routing information, and communication with the transaction layer.
138///
139/// # Key Responsibilities
140///
141/// * Managing dialog state transitions
142/// * Tracking local and remote sequence numbers
143/// * Maintaining routing information (route set, contact URIs)
144/// * Handling authentication credentials
145/// * Coordinating with the transaction layer
146///
147/// # Fields
148///
149/// * `role` - Whether this is a client or server dialog
150/// * `cancel_token` - Token for canceling dialog operations
151/// * `id` - Unique dialog identifier
152/// * `state` - Current dialog state
153/// * `local_seq` - Local CSeq number for outgoing requests
154/// * `remote_seq` - Remote CSeq number for incoming requests
155/// * `local_contact` - Local contact URI
156/// * `remote_uri` - Remote target URI
157/// * `from` - From header value
158/// * `to` - To header value
159/// * `credential` - Authentication credentials if needed
160/// * `route_set` - Route set for request routing
161/// * `endpoint_inner` - Reference to the SIP endpoint
162/// * `state_sender` - Channel for sending state updates
163/// * `tu_sender` - Transaction user sender
164/// * `initial_request` - The initial request that created this dialog
165pub struct DialogInner {
166    pub role: TransactionRole,
167    pub cancel_token: CancellationToken,
168    pub id: Mutex<DialogId>,
169    pub state: Mutex<DialogState>,
170
171    pub local_seq: AtomicU32,
172    pub local_contact: Option<rsip::Uri>,
173    pub remote_contact: Mutex<Option<rsip::headers::untyped::Contact>>,
174
175    pub remote_seq: AtomicU32,
176    pub remote_uri: Mutex<rsip::Uri>,
177
178    pub from: rsip::typed::From,
179    pub to: Mutex<rsip::typed::To>,
180
181    pub credential: Option<Credential>,
182    pub route_set: Mutex<Vec<Route>>,
183    pub(super) endpoint_inner: EndpointInnerRef,
184    pub(super) state_sender: DialogStateSender,
185    pub(super) tu_sender: TransactionEventSender,
186    pub(super) initial_request: Request,
187    pub(super) initial_destination: Option<SipAddr>,
188}
189
190pub type DialogStateReceiver = UnboundedReceiver<DialogState>;
191pub type DialogStateSender = UnboundedSender<DialogState>;
192
193pub(super) type DialogInnerRef = Arc<DialogInner>;
194
195impl DialogState {
196    pub fn can_cancel(&self) -> bool {
197        matches!(
198            self,
199            DialogState::Calling(_) | DialogState::Trying(_) | DialogState::Early(_, _)
200        )
201    }
202    pub fn is_confirmed(&self) -> bool {
203        matches!(self, DialogState::Confirmed(_, _))
204    }
205    pub fn is_terminated(&self) -> bool {
206        matches!(self, DialogState::Terminated(_, _))
207    }
208}
209
210impl DialogInner {
211    pub fn new(
212        role: TransactionRole,
213        id: DialogId,
214        initial_request: Request,
215        endpoint_inner: EndpointInnerRef,
216        state_sender: DialogStateSender,
217        credential: Option<Credential>,
218        local_contact: Option<rsip::Uri>,
219        tu_sender: TransactionEventSender,
220    ) -> Result<Self> {
221        let cseq = initial_request.cseq_header()?.seq()?;
222
223        let remote_uri = match role {
224            TransactionRole::Client => initial_request.uri.clone(),
225            TransactionRole::Server => {
226                extract_uri_from_contact(initial_request.contact_header()?.value())?
227            }
228        };
229
230        let from = initial_request.from_header()?.typed()?;
231        let mut to = initial_request.to_header()?.typed()?;
232        if !to.params.iter().any(|p| matches!(p, Param::Tag(_))) {
233            to.params.push(rsip::Param::Tag(id.to_tag.clone().into()));
234        }
235
236        let mut route_set = vec![];
237        for h in initial_request.headers.iter() {
238            if let Header::RecordRoute(rr) = h {
239                route_set.push(Route::from(rr.value()));
240            }
241        }
242
243        Ok(Self {
244            role,
245            cancel_token: CancellationToken::new(),
246            id: Mutex::new(id.clone()),
247            from: from,
248            to: Mutex::new(to),
249            local_seq: AtomicU32::new(cseq),
250            remote_uri: Mutex::new(remote_uri),
251            remote_seq: AtomicU32::new(0),
252            credential,
253            route_set: Mutex::new(route_set),
254            endpoint_inner,
255            state_sender,
256            tu_sender,
257            state: Mutex::new(DialogState::Calling(id)),
258            initial_request,
259            initial_destination: None,
260            local_contact,
261            remote_contact: Mutex::new(None),
262        })
263    }
264    pub fn can_cancel(&self) -> bool {
265        self.state.lock().unwrap().can_cancel()
266    }
267    pub fn is_confirmed(&self) -> bool {
268        self.state.lock().unwrap().is_confirmed()
269    }
270    pub fn is_terminated(&self) -> bool {
271        self.state.lock().unwrap().is_terminated()
272    }
273    pub fn get_local_seq(&self) -> u32 {
274        self.local_seq.load(Ordering::Relaxed)
275    }
276    pub fn increment_local_seq(&self) -> u32 {
277        self.local_seq.fetch_add(1, Ordering::Relaxed);
278        self.local_seq.load(Ordering::Relaxed)
279    }
280
281    pub fn update_remote_tag(&self, tag: &str) -> Result<()> {
282        self.id.lock().unwrap().to_tag = tag.to_string();
283        let mut to = self.to.lock().unwrap();
284        *to = to.clone().with_tag(tag.into());
285        Ok(())
286    }
287
288    pub(super) fn build_vias_from_request(&self) -> Result<Vec<Via>> {
289        let mut vias = vec![];
290        for header in self.initial_request.headers.iter() {
291            if let Header::Via(via) = header {
292                if let Ok(mut typed_via) = via.typed() {
293                    for param in typed_via.params.iter_mut() {
294                        if let Param::Branch(_) = param {
295                            *param = make_via_branch();
296                        }
297                    }
298                    vias.push(typed_via);
299                    return Ok(vias);
300                }
301            }
302        }
303        let via = self.endpoint_inner.get_via(None, None)?;
304        vias.push(via);
305        Ok(vias)
306    }
307
308    pub(super) fn make_request_with_vias(
309        &self,
310        method: rsip::Method,
311        cseq: Option<u32>,
312        vias: Vec<rsip::headers::typed::Via>,
313        headers: Option<Vec<rsip::Header>>,
314        body: Option<Vec<u8>>,
315    ) -> Result<rsip::Request> {
316        let mut headers = headers.unwrap_or_default();
317        let cseq_header = CSeq {
318            seq: cseq.unwrap_or_else(|| self.increment_local_seq()),
319            method,
320        };
321
322        for via in vias {
323            headers.push(Header::Via(via.into()));
324        }
325        headers.push(Header::CallId(
326            self.id.lock().unwrap().call_id.clone().into(),
327        ));
328
329        let to = self
330            .to
331            .lock()
332            .unwrap()
333            .clone()
334            .untyped()
335            .value()
336            .to_string();
337
338        let from = self.from.clone().untyped().value().to_string();
339        match self.role {
340            TransactionRole::Client => {
341                headers.push(Header::From(from.into()));
342                headers.push(Header::To(to.into()));
343            }
344            TransactionRole::Server => {
345                headers.push(Header::From(to.into()));
346                headers.push(Header::To(from.into()));
347            }
348        }
349        headers.push(Header::CSeq(cseq_header.into()));
350        headers.push(Header::UserAgent(
351            self.endpoint_inner.user_agent.clone().into(),
352        ));
353
354        self.local_contact
355            .as_ref()
356            .map(|c| headers.push(Contact::from(c.clone()).into()));
357
358        {
359            let route_set = self.route_set.lock().unwrap();
360            headers.extend(route_set.iter().cloned().map(Header::Route));
361        }
362        headers.push(Header::MaxForwards(70.into()));
363
364        body.as_ref().map(|b| {
365            headers.push(Header::ContentLength((b.len() as u32).into()));
366        });
367
368        let req = rsip::Request {
369            method,
370            uri: self.remote_uri.lock().unwrap().clone(),
371            headers: headers.into(),
372            body: body.unwrap_or_default(),
373            version: rsip::Version::V2,
374        };
375        Ok(req)
376    }
377
378    pub(super) fn make_request(
379        &self,
380        method: rsip::Method,
381        cseq: Option<u32>,
382        addr: Option<crate::transport::SipAddr>,
383        branch: Option<Param>,
384        headers: Option<Vec<rsip::Header>>,
385        body: Option<Vec<u8>>,
386    ) -> Result<rsip::Request> {
387        let via = self.endpoint_inner.get_via(addr, branch)?;
388        self.make_request_with_vias(method, cseq, vec![via], headers, body)
389    }
390
391    pub(super) fn make_response(
392        &self,
393        request: &Request,
394        status: StatusCode,
395        headers: Option<Vec<rsip::Header>>,
396        body: Option<Vec<u8>>,
397    ) -> rsip::Response {
398        let mut resp_headers = rsip::Headers::default();
399
400        for header in request.headers.iter() {
401            match header {
402                Header::Via(via) => {
403                    resp_headers.push(Header::Via(via.clone()));
404                }
405                Header::From(from) => {
406                    resp_headers.push(Header::From(from.clone()));
407                }
408                Header::To(to) => {
409                    let mut to = match to.clone().typed() {
410                        Ok(to) => to,
411                        Err(e) => {
412                            info!("error parsing to header {}", e);
413                            continue;
414                        }
415                    };
416
417                    if status != StatusCode::Trying
418                        && !to.params.iter().any(|p| matches!(p, Param::Tag(_)))
419                    {
420                        to.params.push(rsip::Param::Tag(
421                            self.id.lock().unwrap().to_tag.clone().into(),
422                        ));
423                    }
424                    resp_headers.push(Header::To(to.into()));
425                }
426                Header::CSeq(cseq) => {
427                    resp_headers.push(Header::CSeq(cseq.clone()));
428                }
429                Header::CallId(call_id) => {
430                    resp_headers.push(Header::CallId(call_id.clone()));
431                }
432                Header::RecordRoute(rr) => {
433                    // Copy Record-Route headers from request to response (RFC 3261)
434                    resp_headers.push(Header::RecordRoute(rr.clone()));
435                }
436                _ => {}
437            }
438        }
439
440        if let Some(headers) = headers {
441            for header in headers {
442                resp_headers.unique_push(header);
443            }
444        }
445
446        resp_headers.retain(|h| {
447            !matches!(
448                h,
449                Header::Contact(_) | Header::ContentLength(_) | Header::UserAgent(_)
450            )
451        });
452
453        self.local_contact
454            .as_ref()
455            .map(|c| resp_headers.push(Contact::from(c.clone()).into()));
456
457        body.as_ref().map(|b| {
458            resp_headers.push(Header::ContentLength((b.len() as u32).into()));
459        });
460
461        resp_headers.push(Header::UserAgent(
462            self.endpoint_inner.user_agent.clone().into(),
463        ));
464
465        Response {
466            status_code: status,
467            headers: resp_headers,
468            body: body.unwrap_or_default(),
469            version: request.version().clone(),
470        }
471    }
472
473    pub(super) async fn do_request(&self, request: Request) -> Result<Option<rsip::Response>> {
474        let method = request.method().to_owned();
475        let destination = self
476            .remote_contact
477            .lock()
478            .unwrap()
479            .as_ref()
480            .and_then(|c| c.uri().ok().as_ref()?.try_into().ok())
481            .or_else(|| self.initial_destination.clone());
482
483        let key = TransactionKey::from_request(&request, TransactionRole::Client)?;
484        let mut tx = Transaction::new_client(key, request, self.endpoint_inner.clone(), None);
485        tx.destination = destination;
486
487        match tx.send().await {
488            Ok(_) => {
489                info!(
490                    id = self.id.lock().unwrap().to_string(),
491                    method = %method,
492                    destination=tx.destination.as_ref().map(|d| d.to_string()).as_deref(),
493                    key=%tx.key,
494                    "request sent done",
495                );
496            }
497            Err(e) => {
498                warn!(
499                    id = self.id.lock().unwrap().to_string(),
500                    destination = tx.destination.as_ref().map(|d| d.to_string()).as_deref(),
501                    "failed to send request error: {}\n{}",
502                    e,
503                    tx.original
504                );
505                return Err(e);
506            }
507        }
508        let mut auth_sent = false;
509        while let Some(msg) = tx.receive().await {
510            match msg {
511                SipMessage::Response(resp) => match resp.status_code {
512                    StatusCode::Trying => {
513                        continue;
514                    }
515                    StatusCode::Ringing | StatusCode::SessionProgress => {
516                        self.transition(DialogState::Early(self.id.lock().unwrap().clone(), resp))?;
517                        continue;
518                    }
519                    StatusCode::ProxyAuthenticationRequired | StatusCode::Unauthorized => {
520                        let id = self.id.lock().unwrap().clone();
521                        if auth_sent {
522                            info!(
523                                id = self.id.lock().unwrap().to_string(),
524                                "received {} response after auth sent", resp.status_code
525                            );
526                            self.transition(DialogState::Terminated(
527                                id,
528                                TerminatedReason::ProxyAuthRequired,
529                            ))?;
530                            break;
531                        }
532                        auth_sent = true;
533                        if let Some(cred) = &self.credential {
534                            let new_seq = match method {
535                                rsip::Method::Cancel => self.get_local_seq(),
536                                _ => self.increment_local_seq(),
537                            };
538                            tx = handle_client_authenticate(new_seq, tx, resp, cred).await?;
539                            tx.send().await?;
540                            continue;
541                        } else {
542                            info!(
543                                id = self.id.lock().unwrap().to_string(),
544                                "received 407 response without auth option"
545                            );
546                            self.transition(DialogState::Terminated(
547                                id,
548                                TerminatedReason::ProxyAuthRequired,
549                            ))?;
550                        }
551                    }
552                    _ => {
553                        debug!(
554                            id = self.id.lock().unwrap().to_string(),
555                            method = %method,
556                            "dialog do_request done: {:?}", resp.status_code
557                        );
558                        return Ok(Some(resp));
559                    }
560                },
561                _ => break,
562            }
563        }
564        Ok(None)
565    }
566
567    pub(super) fn transition(&self, state: DialogState) -> Result<()> {
568        // Try to send state update, but don't fail if channel is closed
569        self.state_sender.send(state.clone()).ok();
570
571        match state {
572            DialogState::Updated(_, _)
573            | DialogState::Notify(_, _)
574            | DialogState::Info(_, _)
575            | DialogState::Options(_, _) => {
576                return Ok(());
577            }
578            _ => {}
579        }
580        let mut old_state = self.state.lock().unwrap();
581        match (&*old_state, &state) {
582            (DialogState::Terminated(id, _), _) => {
583                warn!(
584                    %id,
585                    "dialog already terminated, ignoring transition to {}", state
586                );
587                return Ok(());
588            }
589            _ => {}
590        }
591        debug!("transitioning state: {} -> {}", old_state, state);
592        *old_state = state;
593        Ok(())
594    }
595
596    pub(super) fn serve_keepalive_options(dlg_inner: Arc<Self>) {
597        let keepalive = match dlg_inner.endpoint_inner.option.dialog_keepalive_duration {
598            Some(k) => k,
599            None => return,
600        };
601        let token = dlg_inner.cancel_token.child_token();
602        let dlg_ref = dlg_inner.clone();
603
604        tokio::spawn(async move {
605            let mut ticker = interval(keepalive);
606            // skip first tick, which will be reached immediately
607            ticker.tick().await;
608            let keepalive_loop = async {
609                loop {
610                    ticker.tick().await;
611                    if !dlg_ref.is_confirmed() {
612                        return Ok(());
613                    }
614                    let options = dlg_ref.make_request(
615                        rsip::Method::Options,
616                        None,
617                        None,
618                        None,
619                        None,
620                        None,
621                    )?;
622                    let id = dlg_ref.id.lock().unwrap().clone();
623                    match dlg_ref.do_request(options).await {
624                        Ok(Some(resp)) => match resp.status_code.kind() {
625                            StatusCodeKind::Provisional | StatusCodeKind::Successful => {
626                                continue;
627                            }
628                            _ => {
629                                info!(%id, status = %resp.status_code, "keepalive options failed");
630                            }
631                        },
632                        Ok(None) => {
633                            continue;
634                        }
635                        Err(_) => {}
636                    }
637                    dlg_ref
638                        .transition(DialogState::Terminated(id, TerminatedReason::Timeout))
639                        .ok();
640                    break;
641                }
642                Ok::<(), crate::Error>(())
643            };
644            tokio::select! {
645                _ = token.cancelled() => {}
646                _ = keepalive_loop =>{}
647            };
648        });
649    }
650}
651
652impl std::fmt::Display for DialogState {
653    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
654        match self {
655            DialogState::Calling(id) => write!(f, "{}(Calling)", id),
656            DialogState::Trying(id) => write!(f, "{}(Trying)", id),
657            DialogState::Early(id, _) => write!(f, "{}(Early)", id),
658            DialogState::WaitAck(id, _) => write!(f, "{}(WaitAck)", id),
659            DialogState::Confirmed(id, _) => write!(f, "{}(Confirmed)", id),
660            DialogState::Updated(id, _) => write!(f, "{}(Updated)", id),
661            DialogState::Notify(id, _) => write!(f, "{}(Notify)", id),
662            DialogState::Info(id, _) => write!(f, "{}(Info)", id),
663            DialogState::Options(id, _) => write!(f, "{}(Options)", id),
664            DialogState::Terminated(id, reason) => write!(f, "{}(Terminated {:?})", id, reason),
665        }
666    }
667}
668
669impl Dialog {
670    pub fn id(&self) -> DialogId {
671        match self {
672            Dialog::ServerInvite(d) => d.inner.id.lock().unwrap().clone(),
673            Dialog::ClientInvite(d) => d.inner.id.lock().unwrap().clone(),
674        }
675    }
676
677    pub fn from(&self) -> &rsip::typed::From {
678        match self {
679            Dialog::ServerInvite(d) => &d.inner.from,
680            Dialog::ClientInvite(d) => &d.inner.from,
681        }
682    }
683
684    pub fn to(&self) -> rsip::typed::To {
685        match self {
686            Dialog::ServerInvite(d) => d.inner.to.lock().unwrap().clone(),
687            Dialog::ClientInvite(d) => d.inner.to.lock().unwrap().clone(),
688        }
689    }
690    pub fn remote_contact(&self) -> Option<rsip::Uri> {
691        match self {
692            Dialog::ServerInvite(d) => d
693                .inner
694                .remote_contact
695                .lock()
696                .unwrap()
697                .as_ref()
698                .map(|c| c.uri().ok())
699                .flatten(),
700            Dialog::ClientInvite(d) => d
701                .inner
702                .remote_contact
703                .lock()
704                .unwrap()
705                .as_ref()
706                .map(|c| c.uri().ok())
707                .flatten(),
708        }
709    }
710
711    pub async fn handle(&mut self, tx: &mut Transaction) -> Result<()> {
712        match self {
713            Dialog::ServerInvite(d) => d.handle(tx).await,
714            Dialog::ClientInvite(d) => d.handle(tx).await,
715        }
716    }
717    pub fn on_remove(&self) {
718        match self {
719            Dialog::ServerInvite(d) => {
720                d.inner.cancel_token.cancel();
721            }
722            Dialog::ClientInvite(d) => {
723                d.inner.cancel_token.cancel();
724            }
725        }
726    }
727
728    pub async fn hangup(&self) -> Result<()> {
729        match self {
730            Dialog::ServerInvite(d) => d.bye().await,
731            Dialog::ClientInvite(d) => d.hangup().await,
732        }
733    }
734
735    pub fn can_cancel(&self) -> bool {
736        match self {
737            Dialog::ServerInvite(d) => d.inner.can_cancel(),
738            Dialog::ClientInvite(d) => d.inner.can_cancel(),
739        }
740    }
741}