rsipstack/dialog/
dialog.rs

1use super::{
2    authenticate::{handle_client_authenticate, Credential},
3    client_dialog::ClientInviteDialog,
4    server_dialog::ServerInviteDialog,
5    DialogId,
6};
7use crate::{
8    header_pop,
9    rsip_ext::extract_uri_from_contact,
10    transaction::{
11        endpoint::EndpointInnerRef,
12        key::{TransactionKey, TransactionRole},
13        make_via_branch,
14        transaction::{Transaction, TransactionEventSender},
15    },
16    Result,
17};
18use rsip::{
19    headers::Route,
20    prelude::{HeadersExt, ToTypedHeader, UntypedHeader},
21    typed::{CSeq, Contact, Via},
22    Header, Param, Request, Response, SipMessage, StatusCode,
23};
24use std::sync::{
25    atomic::{AtomicU32, Ordering},
26    Arc, Mutex,
27};
28use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, info, warn};
31
32/// SIP Dialog State
33///
34/// Represents the various states a SIP dialog can be in during its lifecycle.
35/// These states follow the SIP dialog state machine as defined in RFC 3261.
36///
37/// # States
38///
39/// * `Calling` - Initial state when a dialog is created for an outgoing INVITE
40/// * `Trying` - Dialog has received a 100 Trying response
41/// * `Early` - Dialog is in early state (1xx response received, except 100)
42/// * `WaitAck` - Server dialog waiting for ACK after sending 2xx response
43/// * `Confirmed` - Dialog is established and confirmed (2xx response received/sent and ACK sent/received)
44/// * `Updated` - Dialog received an UPDATE request
45/// * `Notify` - Dialog received a NOTIFY request  
46/// * `Info` - Dialog received an INFO request
47/// * `Options` - Dialog received an OPTIONS request
48/// * `Terminated` - Dialog has been terminated
49///
50/// # Examples
51///
52/// ```rust,no_run
53/// use rsipstack::dialog::dialog::DialogState;
54/// use rsipstack::dialog::DialogId;
55///
56/// # fn example() {
57/// # let dialog_id = DialogId {
58/// #     call_id: "test@example.com".to_string(),
59/// #     from_tag: "from-tag".to_string(),
60/// #     to_tag: "to-tag".to_string(),
61/// # };
62/// let state = DialogState::Confirmed(dialog_id);
63/// if state.is_confirmed() {
64///     println!("Dialog is established");
65/// }
66/// # }
67/// ```
68#[derive(Clone)]
69pub enum DialogState {
70    Calling(DialogId),
71    Trying(DialogId),
72    Early(DialogId, rsip::Response),
73    WaitAck(DialogId, rsip::Response),
74    Confirmed(DialogId),
75    Updated(DialogId, rsip::Request),
76    Notify(DialogId, rsip::Request),
77    Info(DialogId, rsip::Request),
78    Options(DialogId, rsip::Request),
79    Terminated(DialogId, TerminatedReason),
80}
81
82#[derive(Debug, Clone)]
83pub enum TerminatedReason {
84    Timeout,
85    UacCancel,
86    UacBye,
87    UasBye,
88    UacBusy,
89    UasBusy,
90    UasDecline,
91    ProxyError(rsip::StatusCode),
92    ProxyAuthRequired,
93    UacOther(Option<rsip::StatusCode>),
94    UasOther(Option<rsip::StatusCode>),
95}
96
97/// SIP Dialog
98///
99/// Represents a SIP dialog which can be either a server-side or client-side INVITE dialog.
100/// A dialog is a peer-to-peer SIP relationship between two user agents that persists
101/// for some time. Dialogs are established by SIP methods like INVITE.
102///
103/// # Variants
104///
105/// * `ServerInvite` - Server-side INVITE dialog (UAS)
106/// * `ClientInvite` - Client-side INVITE dialog (UAC)
107///
108/// # Examples
109///
110/// ```rust,no_run
111/// use rsipstack::dialog::dialog::Dialog;
112///
113/// # fn handle_dialog(dialog: Dialog) {
114/// match dialog {
115///     Dialog::ServerInvite(server_dialog) => {
116///         // Handle server dialog
117///     },
118///     Dialog::ClientInvite(client_dialog) => {
119///         // Handle client dialog  
120///     }
121/// }
122/// # }
123/// ```
124#[derive(Clone)]
125pub enum Dialog {
126    ServerInvite(ServerInviteDialog),
127    ClientInvite(ClientInviteDialog),
128}
129
130/// Internal Dialog State and Management
131///
132/// `DialogInner` contains the core state and functionality shared between
133/// client and server dialogs. It manages dialog state transitions, sequence numbers,
134/// routing information, and communication with the transaction layer.
135///
136/// # Key Responsibilities
137///
138/// * Managing dialog state transitions
139/// * Tracking local and remote sequence numbers
140/// * Maintaining routing information (route set, contact URIs)
141/// * Handling authentication credentials
142/// * Coordinating with the transaction layer
143///
144/// # Fields
145///
146/// * `role` - Whether this is a client or server dialog
147/// * `cancel_token` - Token for canceling dialog operations
148/// * `id` - Unique dialog identifier
149/// * `state` - Current dialog state
150/// * `local_seq` - Local CSeq number for outgoing requests
151/// * `remote_seq` - Remote CSeq number for incoming requests
152/// * `local_contact` - Local contact URI
153/// * `remote_uri` - Remote target URI
154/// * `from` - From header value
155/// * `to` - To header value
156/// * `credential` - Authentication credentials if needed
157/// * `route_set` - Route set for request routing
158/// * `endpoint_inner` - Reference to the SIP endpoint
159/// * `state_sender` - Channel for sending state updates
160/// * `tu_sender` - Transaction user sender
161/// * `initial_request` - The initial request that created this dialog
162pub struct DialogInner {
163    pub role: TransactionRole,
164    pub cancel_token: CancellationToken,
165    pub id: Mutex<DialogId>,
166    pub state: Mutex<DialogState>,
167
168    pub local_seq: AtomicU32,
169    pub local_contact: Option<rsip::Uri>,
170    pub remote_contact: Mutex<Option<rsip::headers::untyped::Contact>>,
171
172    pub remote_seq: AtomicU32,
173    pub remote_uri: rsip::Uri,
174
175    pub from: String,
176    pub to: Mutex<String>,
177
178    pub credential: Option<Credential>,
179    pub route_set: Vec<Route>,
180    pub(super) endpoint_inner: EndpointInnerRef,
181    pub(super) state_sender: DialogStateSender,
182    pub(super) tu_sender: TransactionEventSender,
183    pub(super) initial_request: Request,
184}
185
186pub type DialogStateReceiver = UnboundedReceiver<DialogState>;
187pub type DialogStateSender = UnboundedSender<DialogState>;
188
189pub(super) type DialogInnerRef = Arc<DialogInner>;
190
191impl DialogState {
192    pub fn can_cancel(&self) -> bool {
193        matches!(
194            self,
195            DialogState::Calling(_) | DialogState::Trying(_) | DialogState::Early(_, _)
196        )
197    }
198    pub fn is_confirmed(&self) -> bool {
199        matches!(self, DialogState::Confirmed(_))
200    }
201    pub fn is_terminated(&self) -> bool {
202        matches!(self, DialogState::Terminated(_, _))
203    }
204}
205
206impl DialogInner {
207    pub fn new(
208        role: TransactionRole,
209        id: DialogId,
210        initial_request: Request,
211        endpoint_inner: EndpointInnerRef,
212        state_sender: DialogStateSender,
213        credential: Option<Credential>,
214        local_contact: Option<rsip::Uri>,
215        tu_sender: TransactionEventSender,
216    ) -> Result<Self> {
217        let cseq = initial_request.cseq_header()?.seq()?;
218
219        let remote_uri = match role {
220            TransactionRole::Client => initial_request.uri.clone(),
221            TransactionRole::Server => {
222                extract_uri_from_contact(initial_request.contact_header()?.value())?
223            }
224        };
225
226        let from = initial_request.from_header()?.typed()?;
227        let mut to = initial_request.to_header()?.typed()?;
228        if !to.params.iter().any(|p| matches!(p, Param::Tag(_))) {
229            to.params.push(rsip::Param::Tag(id.to_tag.clone().into()));
230        }
231
232        let (from, to) = match role {
233            TransactionRole::Client => (from.to_string(), to.to_string()),
234            TransactionRole::Server => (to.to_string(), from.to_string()),
235        };
236
237        let mut route_set = vec![];
238        for h in initial_request.headers.iter() {
239            if let Header::RecordRoute(rr) = h {
240                route_set.push(Route::from(rr.value()));
241            }
242        }
243
244        // For UAS (server), route set must be reversed (RFC 3261 section 12.1.1)
245        if role == TransactionRole::Server {
246            route_set.reverse();
247        }
248        Ok(Self {
249            role,
250            cancel_token: CancellationToken::new(),
251            id: Mutex::new(id.clone()),
252            from,
253            to: Mutex::new(to),
254            local_seq: AtomicU32::new(cseq),
255            remote_uri,
256            remote_seq: AtomicU32::new(0),
257            credential,
258            route_set,
259            endpoint_inner,
260            state_sender,
261            tu_sender,
262            state: Mutex::new(DialogState::Calling(id)),
263            initial_request,
264            local_contact,
265            remote_contact: Mutex::new(None),
266        })
267    }
268    pub fn can_cancel(&self) -> bool {
269        self.state.lock().unwrap().can_cancel()
270    }
271    pub fn is_confirmed(&self) -> bool {
272        self.state.lock().unwrap().is_confirmed()
273    }
274    pub fn is_terminated(&self) -> bool {
275        self.state.lock().unwrap().is_terminated()
276    }
277    pub fn get_local_seq(&self) -> u32 {
278        self.local_seq.load(Ordering::Relaxed)
279    }
280    pub fn increment_local_seq(&self) -> u32 {
281        self.local_seq.fetch_add(1, Ordering::Relaxed);
282        self.local_seq.load(Ordering::Relaxed)
283    }
284
285    pub fn update_remote_tag(&self, tag: &str) -> Result<()> {
286        self.id.lock().unwrap().to_tag = tag.to_string();
287        let to: rsip::headers::untyped::To = self.to.lock().unwrap().clone().into();
288        *self.to.lock().unwrap() = to.typed()?.with_tag(tag.to_string().into()).to_string();
289        info!("updating remote tag to: {}", self.to.lock().unwrap());
290        Ok(())
291    }
292
293    pub(super) fn build_vias_from_request(&self) -> Result<Vec<Via>> {
294        let mut vias = vec![];
295        for header in self.initial_request.headers.iter() {
296            if let Header::Via(via) = header {
297                if let Ok(mut typed_via) = via.typed() {
298                    for param in typed_via.params.iter_mut() {
299                        if let Param::Branch(_) = param {
300                            *param = make_via_branch();
301                        }
302                    }
303                    vias.push(typed_via);
304                    return Ok(vias);
305                }
306            }
307        }
308        let via = self.endpoint_inner.get_via(None, None)?;
309        vias.push(via);
310        Ok(vias)
311    }
312
313    pub(super) fn make_request_with_vias(
314        &self,
315        method: rsip::Method,
316        cseq: Option<u32>,
317        vias: Vec<rsip::headers::typed::Via>,
318        headers: Option<Vec<rsip::Header>>,
319        body: Option<Vec<u8>>,
320    ) -> Result<rsip::Request> {
321        let mut headers = headers.unwrap_or_default();
322        let cseq_header = CSeq {
323            seq: cseq.unwrap_or_else(|| self.increment_local_seq()),
324            method,
325        };
326
327        for via in vias {
328            headers.push(Header::Via(via.into()));
329        }
330        headers.push(Header::CallId(
331            self.id.lock().unwrap().call_id.clone().into(),
332        ));
333        headers.push(Header::From(self.from.clone().into()));
334        headers.push(Header::To(self.to.lock().unwrap().clone().into()));
335        headers.push(Header::CSeq(cseq_header.into()));
336        headers.push(Header::UserAgent(
337            self.endpoint_inner.user_agent.clone().into(),
338        ));
339
340        self.local_contact
341            .as_ref()
342            .map(|c| headers.push(Contact::from(c.clone()).into()));
343
344        for route in &self.route_set {
345            headers.push(Header::Route(route.clone()));
346        }
347        headers.push(Header::MaxForwards(70.into()));
348
349        body.as_ref().map(|b| {
350            headers.push(Header::ContentLength((b.len() as u32).into()));
351        });
352
353        let req = rsip::Request {
354            method,
355            uri: self.remote_uri.clone(),
356            headers: headers.into(),
357            body: body.unwrap_or_default(),
358            version: rsip::Version::V2,
359        };
360        Ok(req)
361    }
362
363    pub(super) fn make_request(
364        &self,
365        method: rsip::Method,
366        cseq: Option<u32>,
367        addr: Option<crate::transport::SipAddr>,
368        branch: Option<Param>,
369        headers: Option<Vec<rsip::Header>>,
370        body: Option<Vec<u8>>,
371    ) -> Result<rsip::Request> {
372        let via = self.endpoint_inner.get_via(addr, branch)?;
373        self.make_request_with_vias(method, cseq, vec![via], headers, body)
374    }
375
376    pub(super) fn make_response(
377        &self,
378        request: &Request,
379        status: StatusCode,
380        headers: Option<Vec<rsip::Header>>,
381        body: Option<Vec<u8>>,
382    ) -> rsip::Response {
383        let mut resp_headers = rsip::Headers::default();
384        self.local_contact
385            .as_ref()
386            .map(|c| resp_headers.push(Contact::from(c.clone()).into()));
387
388        for header in request.headers.iter() {
389            match header {
390                Header::Via(via) => {
391                    resp_headers.push(Header::Via(via.clone()));
392                }
393                Header::From(from) => {
394                    resp_headers.push(Header::From(from.clone()));
395                }
396                Header::To(to) => {
397                    let mut to = match to.clone().typed() {
398                        Ok(to) => to,
399                        Err(e) => {
400                            info!("error parsing to header {}", e);
401                            continue;
402                        }
403                    };
404
405                    if status != StatusCode::Trying {
406                        if !to.params.iter().any(|p| matches!(p, Param::Tag(_))) {
407                            to.params.push(rsip::Param::Tag(
408                                self.id.lock().unwrap().to_tag.clone().into(),
409                            ));
410                        }
411                    }
412                    resp_headers.push(Header::To(to.into()));
413                }
414                Header::CSeq(cseq) => {
415                    resp_headers.push(Header::CSeq(cseq.clone()));
416                }
417                Header::CallId(call_id) => {
418                    resp_headers.push(Header::CallId(call_id.clone()));
419                }
420                Header::RecordRoute(rr) => {
421                    // Copy Record-Route headers from request to response (RFC 3261)
422                    resp_headers.push(Header::RecordRoute(rr.clone()));
423                }
424                _ => {}
425            }
426        }
427
428        if let Some(headers) = headers {
429            for header in headers {
430                resp_headers.unique_push(header);
431            }
432        }
433
434        body.as_ref().map(|b| {
435            resp_headers.push(Header::ContentLength((b.len() as u32).into()));
436        });
437
438        resp_headers.unique_push(Header::UserAgent(
439            self.endpoint_inner.user_agent.clone().into(),
440        ));
441
442        Response {
443            status_code: status,
444            headers: resp_headers,
445            body: body.unwrap_or_default(),
446            version: request.version().clone(),
447        }
448    }
449
450    pub(super) async fn do_request(&self, mut request: Request) -> Result<Option<rsip::Response>> {
451        let method = request.method().to_owned();
452        let mut destination = request
453            .route_header()
454            .map(|r| {
455                r.typed()
456                    .ok()
457                    .map(|r| r.uris().first().map(|u| u.uri.clone()))
458                    .flatten()
459            })
460            .flatten();
461
462        if destination.is_none() {
463            if let Some(contact) = self.remote_contact.lock().unwrap().as_ref() {
464                destination = contact.uri().ok();
465            }
466        }
467
468        header_pop!(request.headers, Header::Route);
469
470        let key = TransactionKey::from_request(&request, TransactionRole::Client)?;
471        let mut tx = Transaction::new_client(key, request, self.endpoint_inner.clone(), None);
472        tx.destination = destination.as_ref().map(|d| d.try_into().ok()).flatten();
473
474        match tx.send().await {
475            Ok(_) => {
476                info!(
477                    id = self.id.lock().unwrap().to_string(),
478                    method = %method,
479                    destination=tx.destination.as_ref().map(|d| d.to_string()).as_deref(),
480                    key=%tx.key,
481                    "request sent done",
482                );
483            }
484            Err(e) => {
485                warn!(
486                    id = self.id.lock().unwrap().to_string(),
487                    destination = tx.destination.as_ref().map(|d| d.to_string()).as_deref(),
488                    "failed to send request error: {}\n{}",
489                    e,
490                    tx.original
491                );
492                return Err(e);
493            }
494        }
495        let mut auth_sent = false;
496        while let Some(msg) = tx.receive().await {
497            match msg {
498                SipMessage::Response(resp) => match resp.status_code {
499                    StatusCode::Trying => {
500                        continue;
501                    }
502                    StatusCode::Ringing | StatusCode::SessionProgress => {
503                        self.transition(DialogState::Early(self.id.lock().unwrap().clone(), resp))?;
504                        continue;
505                    }
506                    StatusCode::ProxyAuthenticationRequired | StatusCode::Unauthorized => {
507                        let id = self.id.lock().unwrap().clone();
508                        if auth_sent {
509                            info!(
510                                id = self.id.lock().unwrap().to_string(),
511                                "received {} response after auth sent", resp.status_code
512                            );
513                            self.transition(DialogState::Terminated(
514                                id,
515                                TerminatedReason::ProxyAuthRequired,
516                            ))?;
517                            break;
518                        }
519                        auth_sent = true;
520                        if let Some(cred) = &self.credential {
521                            let new_seq = match method {
522                                rsip::Method::Cancel => self.get_local_seq(),
523                                _ => self.increment_local_seq(),
524                            };
525                            tx = handle_client_authenticate(new_seq, tx, resp, cred).await?;
526                            tx.send().await?;
527                            continue;
528                        } else {
529                            info!(
530                                id = self.id.lock().unwrap().to_string(),
531                                "received 407 response without auth option"
532                            );
533                            self.transition(DialogState::Terminated(
534                                id,
535                                TerminatedReason::ProxyAuthRequired,
536                            ))?;
537                        }
538                    }
539                    _ => {
540                        debug!(
541                            id = self.id.lock().unwrap().to_string(),
542                            method = %method,
543                            "dialog do_request done: {:?}", resp.status_code
544                        );
545                        return Ok(Some(resp));
546                    }
547                },
548                _ => break,
549            }
550        }
551        Ok(None)
552    }
553
554    pub(super) fn transition(&self, state: DialogState) -> Result<()> {
555        // Try to send state update, but don't fail if channel is closed
556        self.state_sender.send(state.clone()).ok();
557
558        match state {
559            DialogState::Updated(_, _)
560            | DialogState::Notify(_, _)
561            | DialogState::Info(_, _)
562            | DialogState::Options(_, _) => {
563                return Ok(());
564            }
565            _ => {}
566        }
567        let mut old_state = self.state.lock().unwrap();
568        match (&*old_state, &state) {
569            (DialogState::Terminated(id, _), _) => {
570                warn!(
571                    %id,
572                    "dialog already terminated, ignoring transition to {}", state
573                );
574                return Ok(());
575            }
576            _ => {}
577        }
578        info!("transitioning state: {} -> {}", old_state, state);
579        *old_state = state;
580        Ok(())
581    }
582}
583
584impl std::fmt::Display for DialogState {
585    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
586        match self {
587            DialogState::Calling(id) => write!(f, "{}(Calling)", id),
588            DialogState::Trying(id) => write!(f, "{}(Trying)", id),
589            DialogState::Early(id, _) => write!(f, "{}(Early)", id),
590            DialogState::WaitAck(id, _) => write!(f, "{}(WaitAck)", id),
591            DialogState::Confirmed(id) => write!(f, "{}(Confirmed)", id),
592            DialogState::Updated(id, _) => write!(f, "{}(Updated)", id),
593            DialogState::Notify(id, _) => write!(f, "{}(Notify)", id),
594            DialogState::Info(id, _) => write!(f, "{}(Info)", id),
595            DialogState::Options(id, _) => write!(f, "{}(Options)", id),
596            DialogState::Terminated(id, reason) => write!(f, "{}(Terminated {:?})", id, reason),
597        }
598    }
599}
600
601impl Dialog {
602    pub fn id(&self) -> DialogId {
603        match self {
604            Dialog::ServerInvite(d) => d.inner.id.lock().unwrap().clone(),
605            Dialog::ClientInvite(d) => d.inner.id.lock().unwrap().clone(),
606        }
607    }
608
609    pub fn from(&self) -> &str {
610        match self {
611            Dialog::ServerInvite(d) => &d.inner.from,
612            Dialog::ClientInvite(d) => &d.inner.from,
613        }
614    }
615
616    pub fn to(&self) -> String {
617        match self {
618            Dialog::ServerInvite(d) => d.inner.to.lock().unwrap().clone(),
619            Dialog::ClientInvite(d) => d.inner.to.lock().unwrap().clone(),
620        }
621    }
622    pub fn remote_contact(&self) -> Option<rsip::Uri> {
623        match self {
624            Dialog::ServerInvite(d) => d
625                .inner
626                .remote_contact
627                .lock()
628                .unwrap()
629                .as_ref()
630                .map(|c| c.uri().ok())
631                .flatten(),
632            Dialog::ClientInvite(d) => d
633                .inner
634                .remote_contact
635                .lock()
636                .unwrap()
637                .as_ref()
638                .map(|c| c.uri().ok())
639                .flatten(),
640        }
641    }
642
643    pub async fn handle(&mut self, tx: &mut Transaction) -> Result<()> {
644        match self {
645            Dialog::ServerInvite(d) => d.handle(tx).await,
646            Dialog::ClientInvite(d) => d.handle(tx).await,
647        }
648    }
649    pub fn on_remove(&self) {
650        match self {
651            Dialog::ServerInvite(d) => {
652                d.inner.cancel_token.cancel();
653            }
654            Dialog::ClientInvite(d) => {
655                d.inner.cancel_token.cancel();
656            }
657        }
658    }
659
660    pub async fn hangup(&self) -> Result<()> {
661        match self {
662            Dialog::ServerInvite(d) => d.bye().await,
663            Dialog::ClientInvite(d) => d.hangup().await,
664        }
665    }
666}