Skip to main content

rsipstack/transaction/
endpoint.rs

1use super::{
2    key::TransactionKey,
3    make_via_branch,
4    timer::Timer,
5    transaction::{Transaction, TransactionEvent, TransactionEventSender},
6    SipConnection, TransactionReceiver, TransactionSender, TransactionTimer,
7};
8use crate::{
9    dialog::DialogId,
10    rsip_ext::destination_from_request,
11    transport::{transport_layer::DomainResolver, SipAddr, TransportEvent, TransportLayer},
12    Error, Result, VERSION,
13};
14use async_trait::async_trait;
15use rsip::{prelude::HeadersExt, SipMessage};
16use std::{
17    collections::HashMap,
18    sync::{Arc, Mutex, RwLock},
19    time::Duration,
20};
21use tokio::{
22    select,
23    sync::mpsc::{error, unbounded_channel},
24};
25use tokio_util::sync::CancellationToken;
26use tracing::{debug, info, trace, warn};
27
28pub trait MessageInspector: Send + Sync {
29    fn before_send(&self, msg: SipMessage, dest: Option<&SipAddr>) -> SipMessage;
30    fn after_received(&self, msg: SipMessage, from: &SipAddr) -> SipMessage;
31}
32
33#[async_trait]
34pub trait TargetLocator: Send + Sync {
35    async fn locate(&self, uri: &rsip::Uri) -> Result<SipAddr>;
36}
37
38#[async_trait]
39pub trait TransportEventInspector: Send + Sync {
40    async fn handle(&self, event: TransportEvent) -> Option<TransportEvent>;
41}
42
43pub struct EndpointOption {
44    pub t1: Duration,
45    pub t4: Duration,
46    pub t1x64: Duration,
47    pub timerc: Duration,
48    pub callid_suffix: Option<String>,
49}
50
51impl Default for EndpointOption {
52    fn default() -> Self {
53        EndpointOption {
54            t1: Duration::from_millis(500),
55            t4: Duration::from_secs(5),
56            t1x64: Duration::from_millis(64 * 500),
57            timerc: Duration::from_secs(180),
58            callid_suffix: None,
59        }
60    }
61}
62
63pub struct EndpointStats {
64    pub running_transactions: usize,
65    pub finished_transactions: usize,
66    pub waiting_ack: usize,
67}
68
69/// SIP Endpoint Core Implementation
70///
71/// `EndpointInner` is the core implementation of a SIP endpoint that manages
72/// transactions, timers, and transport layer communication. It serves as the
73/// central coordination point for all SIP protocol operations.
74///
75/// # Key Responsibilities
76///
77/// * Managing active SIP transactions
78/// * Handling SIP timers (Timer A, B, D, E, F, G, K)
79/// * Coordinating with the transport layer
80/// * Processing incoming and outgoing SIP messages
81/// * Maintaining transaction state and cleanup
82///
83/// # Fields
84///
85/// * `allows` - List of supported SIP methods
86/// * `user_agent` - User-Agent header value for outgoing messages
87/// * `timers` - Timer management system for SIP timers
88/// * `transport_layer` - Transport layer for network communication
89/// * `finished_transactions` - Cache of completed transactions
90/// * `transactions` - Active transaction senders
91/// * `incoming_sender` - Channel for incoming transaction notifications
92/// * `cancel_token` - Cancellation token for graceful shutdown
93/// * `timer_interval` - Interval for timer processing
94/// * `t1`, `t4`, `t1x64` - SIP timer values as per RFC 3261
95///
96/// # Timer Values
97///
98/// * `t1` - RTT estimate (default 500ms)
99/// * `t4` - Maximum duration a message will remain in the network (default 4s)
100/// * `t1x64` - Maximum retransmission timeout (default 32s)
101pub struct EndpointInner {
102    pub allows: Mutex<Option<Vec<rsip::Method>>>,
103    pub user_agent: String,
104    pub timers: Timer<TransactionTimer>,
105    pub transport_layer: TransportLayer,
106    pub finished_transactions: RwLock<HashMap<TransactionKey, Option<SipMessage>>>,
107    pub transactions: RwLock<HashMap<TransactionKey, TransactionEventSender>>,
108    pub waiting_ack: RwLock<HashMap<DialogId, TransactionKey>>,
109    incoming_sender: TransactionSender,
110    incoming_receiver: Mutex<Option<TransactionReceiver>>,
111    cancel_token: CancellationToken,
112    #[allow(dead_code)]
113    timer_interval: Duration,
114    pub(super) message_inspector: Option<Box<dyn MessageInspector>>,
115    pub(super) locator: Option<Box<dyn TargetLocator>>,
116    pub(super) transport_inspector: Option<Box<dyn TransportEventInspector>>,
117    pub option: EndpointOption,
118}
119pub type EndpointInnerRef = Arc<EndpointInner>;
120
121/// SIP Endpoint Builder
122///
123/// `EndpointBuilder` provides a fluent interface for constructing SIP endpoints
124/// with custom configuration. It follows the builder pattern to allow flexible
125/// endpoint configuration.
126///
127/// # Examples
128///
129/// ```rust
130/// use rsipstack::EndpointBuilder;
131/// use std::time::Duration;
132///
133/// let endpoint = EndpointBuilder::new()
134///     .with_user_agent("MyApp/1.0")
135///     .with_timer_interval(Duration::from_millis(10))
136///     .with_allows(vec![rsip::Method::Invite, rsip::Method::Bye])
137///     .build();
138/// ```
139pub struct EndpointBuilder {
140    allows: Vec<rsip::Method>,
141    user_agent: String,
142    transport_layer: Option<TransportLayer>,
143    cancel_token: Option<CancellationToken>,
144    timer_interval: Option<Duration>,
145    option: Option<EndpointOption>,
146    message_inspector: Option<Box<dyn MessageInspector>>,
147    target_locator: Option<Box<dyn TargetLocator>>,
148    transport_inspector: Option<Box<dyn TransportEventInspector>>,
149    domain_resolver: Option<Box<dyn DomainResolver>>,
150}
151
152/// SIP Endpoint
153///
154/// `Endpoint` is the main entry point for SIP protocol operations. It provides
155/// a high-level interface for creating and managing SIP transactions, handling
156/// incoming requests, and coordinating with the transport layer.
157///
158/// # Key Features
159///
160/// * Transaction management and lifecycle
161/// * Automatic timer handling per RFC 3261
162/// * Transport layer abstraction
163/// * Graceful shutdown support
164/// * Incoming request processing
165///
166/// # Examples
167///
168/// ```rust,no_run
169/// use rsipstack::EndpointBuilder;
170/// use tokio_util::sync::CancellationToken;
171///
172/// #[tokio::main]
173/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
174///     let endpoint = EndpointBuilder::new()
175///         .with_user_agent("MyApp/1.0")
176///         .build();
177///     
178///     // Get incoming transactions
179///     let mut incoming = endpoint.incoming_transactions().expect("incoming_transactions");
180///     
181///     // Start the endpoint
182///     let endpoint_inner = endpoint.inner.clone();
183///     tokio::spawn(async move {
184///          endpoint_inner.serve().await.ok();
185///     });
186///     
187///     // Process incoming transactions
188///     while let Some(transaction) = incoming.recv().await {
189///         // Handle transaction
190///         break; // Exit for example
191///     }
192///     
193///     Ok(())
194/// }
195/// ```
196///
197/// # Lifecycle
198///
199/// 1. Create endpoint using `EndpointBuilder`
200/// 2. Start serving with `serve()` method
201/// 3. Process incoming transactions via `incoming_transactions()`
202/// 4. Shutdown gracefully with `shutdown()`
203pub struct Endpoint {
204    pub inner: EndpointInnerRef,
205}
206
207impl EndpointInner {
208    pub fn new(
209        user_agent: String,
210        transport_layer: TransportLayer,
211        cancel_token: CancellationToken,
212        timer_interval: Option<Duration>,
213        allows: Vec<rsip::Method>,
214        option: Option<EndpointOption>,
215        message_inspector: Option<Box<dyn MessageInspector>>,
216        locator: Option<Box<dyn TargetLocator>>,
217        transport_inspector: Option<Box<dyn TransportEventInspector>>,
218    ) -> Arc<Self> {
219        let (incoming_sender, incoming_receiver) = unbounded_channel();
220        Arc::new(EndpointInner {
221            allows: Mutex::new(Some(allows)),
222            user_agent,
223            timers: Timer::new(),
224            transport_layer,
225            transactions: RwLock::new(HashMap::new()),
226            finished_transactions: RwLock::new(HashMap::new()),
227            waiting_ack: RwLock::new(HashMap::new()),
228            timer_interval: timer_interval.unwrap_or(Duration::from_millis(20)),
229            cancel_token,
230            incoming_sender,
231            incoming_receiver: Mutex::new(Some(incoming_receiver)),
232            option: option.unwrap_or_default(),
233            message_inspector,
234            locator,
235            transport_inspector,
236        })
237    }
238
239    pub async fn serve(self: &Arc<Self>) -> Result<()> {
240        select! {
241            _ = self.cancel_token.cancelled() => {},
242            _ = self.process_timer() => {},
243            r = self.clone().process_transport_layer() => {
244                _ = r?;
245            },
246        }
247        Ok(())
248    }
249
250    // process transport layer, receive message from transport layer
251    async fn process_transport_layer(self: Arc<Self>) -> Result<()> {
252        self.transport_layer.serve_listens().await.ok();
253
254        let mut transport_rx = match self
255            .transport_layer
256            .inner
257            .transport_rx
258            .lock()
259            .unwrap()
260            .take()
261        {
262            Some(rx) => rx,
263            None => {
264                return Err(Error::EndpointError("transport_rx not set".to_string()));
265            }
266        };
267
268        while let Some(mut event) = transport_rx.recv().await {
269            if let Some(transport_inspector) = &self.transport_inspector {
270                match transport_inspector.handle(event).await {
271                    Some(e) => {
272                        event = e;
273                    }
274                    None => {
275                        continue;
276                    }
277                }
278            }
279
280            match event {
281                TransportEvent::Incoming(msg, connection, from) => {
282                    match self.on_received_message(msg, connection, &from).await {
283                        Ok(()) => {}
284                        Err(e) => {
285                            warn!(addr = %from, error = %e, "on_received_message error");
286                        }
287                    }
288                }
289                TransportEvent::New(t) => {
290                    debug!(addr=%t.get_addr(), "new connection");
291                }
292                TransportEvent::Closed(t) => {
293                    debug!(addr=%t.get_addr(), "closed connection");
294                }
295            }
296        }
297        Ok(())
298    }
299
300    pub async fn process_timer(&self) {
301        loop {
302            for t in self.timers.wait_for_ready().await.into_iter() {
303                match t {
304                    TransactionTimer::TimerCleanup(key) => {
305                        trace!(%key, "TimerCleanup");
306                        self.transactions
307                            .write()
308                            .as_mut()
309                            .map(|ts| ts.remove(&key))
310                            .ok();
311                        self.finished_transactions
312                            .write()
313                            .as_mut()
314                            .map(|t| t.remove(&key))
315                            .ok();
316                        continue;
317                    }
318                    _ => {}
319                }
320
321                if let Ok(Some(tu)) =
322                    { self.transactions.read().as_ref().map(|ts| ts.get(&t.key())) }
323                {
324                    match tu.send(TransactionEvent::Timer(t)) {
325                        Ok(_) => {}
326                        Err(error::SendError(t)) => match t {
327                            TransactionEvent::Timer(t) => {
328                                self.detach_transaction(t.key(), None);
329                            }
330                            _ => {}
331                        },
332                    }
333                }
334            }
335        }
336    }
337
338    // Note: This function used for determine destination of response message, from via of the request
339    pub async fn get_destination_from_request(&self, req: &rsip::Request) -> Option<SipAddr> {
340        let (transport, host_with_port) =
341            SipConnection::parse_target_from_via(req.via_header().ok()?).ok()?;
342
343        let sip_addr = SipAddr {
344            r#type: Some(transport),
345            addr: host_with_port,
346        };
347
348        if matches!(sip_addr.addr.host, rsip::Host::Domain(_)) {
349            return self
350                .transport_layer
351                .inner
352                .domain_resolver
353                .resolve(&sip_addr)
354                .await
355                .ok();
356        }
357        Some(sip_addr)
358    }
359
360    // receive message from transport layer
361    pub async fn on_received_message(
362        self: &Arc<Self>,
363        msg: SipMessage,
364        connection: SipConnection,
365        from: &SipAddr,
366    ) -> Result<()> {
367        let mut key = match &msg {
368            SipMessage::Request(req) => {
369                TransactionKey::from_request(req, super::key::TransactionRole::Server)?
370            }
371            SipMessage::Response(resp) => {
372                TransactionKey::from_response(resp, super::key::TransactionRole::Client)?
373            }
374        };
375        match &msg {
376            SipMessage::Request(req) => {
377                match req.method() {
378                    rsip::Method::Ack => {
379                        match DialogId::try_from((req, super::key::TransactionRole::Server)) {
380                            Ok(dialog_id) => {
381                                let tx_key = self
382                                    .waiting_ack
383                                    .read()
384                                    .map(|wa| wa.get(&dialog_id).cloned());
385                                if let Ok(Some(tx_key)) = tx_key {
386                                    key = tx_key;
387                                }
388                            }
389                            Err(_) => {}
390                        }
391                    }
392                    _ => {}
393                }
394                // check is the termination of an existing transaction
395                let last_message = self
396                    .finished_transactions
397                    .read()
398                    .unwrap()
399                    .get(&key)
400                    .cloned()
401                    .flatten();
402
403                if let Some(last_message) = last_message {
404                    let dest = if !connection.is_reliable() {
405                        self.get_destination_from_request(req).await
406                    } else {
407                        None
408                    };
409                    connection.send(last_message, dest.as_ref()).await?;
410                    return Ok(());
411                }
412            }
413            SipMessage::Response(resp) => {
414                let last_message = self
415                    .finished_transactions
416                    .read()
417                    .unwrap()
418                    .get(&key)
419                    .cloned()
420                    .flatten();
421
422                if let Some(mut last_message) = last_message {
423                    match last_message {
424                        SipMessage::Request(ref mut last_req) => {
425                            if last_req.method() == &rsip::Method::Ack {
426                                match resp.status_code.kind() {
427                                    rsip::StatusCodeKind::Provisional => {
428                                        return Ok(());
429                                    }
430                                    rsip::StatusCodeKind::Successful => {
431                                        if last_req.to_header()?.tag().ok().is_none() {
432                                            // don't ack 2xx response when ack is placeholder
433                                            return Ok(());
434                                        }
435                                    }
436                                    _ => {}
437                                }
438
439                                if let Ok(Some(tag)) = resp.to_header().and_then(|h| h.tag()) {
440                                    last_req.to_header_mut().and_then(|h| h.mut_tag(tag)).ok();
441                                }
442
443                                if let rsip::StatusCodeKind::RequestFailure =
444                                    resp.status_code.kind()
445                                {
446                                    // for ACK to 487, send it where it came from
447                                    connection.send(last_message, Some(from)).await?;
448                                    return Ok(());
449                                }
450
451                                let dest = match destination_from_request(&last_req)
452                                    .and_then(|uri| SipAddr::try_from(uri).ok())
453                                {
454                                    Some(addr)
455                                        if matches!(addr.addr.host, rsip::Host::Domain(_)) =>
456                                    {
457                                        self.transport_layer
458                                            .inner
459                                            .domain_resolver
460                                            .resolve(&addr)
461                                            .await
462                                            .ok()
463                                    }
464                                    addr => addr,
465                                };
466
467                                connection.send(last_message, dest.as_ref()).await?;
468                            }
469                        }
470                        _ => {}
471                    }
472                    return Ok(());
473                }
474            }
475        };
476
477        let msg = if let Some(inspector) = &self.message_inspector {
478            inspector.after_received(msg, from)
479        } else {
480            msg
481        };
482
483        if let Some(tu) = self.transactions.read().unwrap().get(&key) {
484            tu.send(TransactionEvent::Received(msg, Some(connection)))
485                .map_err(|e| Error::TransactionError(e.to_string(), key))?;
486            return Ok(());
487        }
488        // if the transaction is not exist, create a new transaction
489        let request = match msg {
490            SipMessage::Request(req) => req,
491            SipMessage::Response(resp) => {
492                if resp.cseq_header()?.method()? != rsip::Method::Cancel {
493                    debug!(%key, response = %resp, "the transaction does not exist");
494                }
495                return Ok(());
496            }
497        };
498
499        match request.method {
500            rsip::Method::Cancel => {
501                let resp = self.make_response(
502                    &request,
503                    rsip::StatusCode::CallTransactionDoesNotExist,
504                    None,
505                );
506                let resp = if let Some(ref inspector) = self.message_inspector {
507                    inspector.before_send(resp.into(), None)
508                } else {
509                    resp.into()
510                };
511
512                let dest = if !connection.is_reliable() {
513                    self.get_destination_from_request(&request).await
514                } else {
515                    None
516                };
517                connection.send(resp, dest.as_ref()).await?;
518                return Ok(());
519            }
520            rsip::Method::Ack => return Ok(()),
521            _ => {}
522        }
523
524        let tx =
525            Transaction::new_server(key.clone(), request.clone(), self.clone(), Some(connection));
526
527        self.incoming_sender.send(tx).ok();
528        Ok(())
529    }
530
531    pub fn attach_transaction(&self, key: &TransactionKey, tu_sender: TransactionEventSender) {
532        trace!(%key, "attach transaction");
533        self.transactions
534            .write()
535            .as_mut()
536            .map(|ts| ts.insert(key.clone(), tu_sender))
537            .ok();
538    }
539
540    pub fn detach_transaction(&self, key: &TransactionKey, last_message: Option<SipMessage>) {
541        trace!(%key, "detach transaction");
542        self.transactions
543            .write()
544            .as_mut()
545            .map(|ts| ts.remove(key))
546            .ok();
547
548        if let Some(msg) = last_message {
549            self.timers.timeout(
550                self.option.t1x64,
551                TransactionTimer::TimerCleanup(key.clone()), // maybe use TimerK ???
552            );
553
554            self.finished_transactions
555                .write()
556                .as_mut()
557                .map(|ft| ft.insert(key.clone(), Some(msg)))
558                .ok();
559        }
560    }
561
562    pub fn get_addrs(&self) -> Vec<SipAddr> {
563        self.transport_layer.get_addrs()
564    }
565
566    pub fn get_record_route(&self) -> Result<rsip::typed::RecordRoute> {
567        let first_addr = self
568            .transport_layer
569            .get_addrs()
570            .first()
571            .ok_or(Error::EndpointError("not sipaddrs".to_string()))
572            .cloned()?;
573        let rr = rsip::UriWithParamsList(vec![rsip::UriWithParams {
574            uri: first_addr.into(),
575            params: vec![rsip::Param::Other("lr".into(), None)],
576        }]);
577        Ok(rr.into())
578    }
579
580    pub fn get_via(
581        &self,
582        addr: Option<crate::transport::SipAddr>,
583        branch: Option<rsip::Param>,
584    ) -> Result<rsip::typed::Via> {
585        let first_addr = match addr {
586            Some(addr) => addr,
587            None => self
588                .transport_layer
589                .get_addrs()
590                .first()
591                .ok_or(Error::EndpointError("not sipaddrs".to_string()))
592                .cloned()?,
593        };
594
595        let via = rsip::typed::Via {
596            version: rsip::Version::V2,
597            transport: first_addr.r#type.unwrap_or_default(),
598            uri: first_addr.addr.into(),
599            params: vec![
600                branch.unwrap_or_else(make_via_branch),
601                rsip::Param::Other("rport".into(), None),
602            ],
603        };
604        Ok(via)
605    }
606
607    pub fn get_running_transactions(&self) -> Option<Vec<TransactionKey>> {
608        self.transactions
609            .read()
610            .map(|ts| ts.keys().cloned().collect())
611            .ok()
612    }
613
614    pub fn get_stats(&self) -> EndpointStats {
615        let waiting_ack = self
616            .waiting_ack
617            .read()
618            .map(|wa| wa.len())
619            .unwrap_or_default();
620        let running_transactions = self
621            .transactions
622            .read()
623            .map(|ts| ts.len())
624            .unwrap_or_default();
625        let finished_transactions = self
626            .finished_transactions
627            .read()
628            .map(|ft| ft.len())
629            .unwrap_or_default();
630
631        EndpointStats {
632            running_transactions,
633            finished_transactions,
634            waiting_ack,
635        }
636    }
637}
638
639impl EndpointBuilder {
640    pub fn new() -> Self {
641        EndpointBuilder {
642            allows: Vec::new(),
643            user_agent: VERSION.to_string(),
644            transport_layer: None,
645            cancel_token: None,
646            timer_interval: None,
647            option: None,
648            message_inspector: None,
649            target_locator: None,
650            transport_inspector: None,
651            domain_resolver: None,
652        }
653    }
654
655    pub fn with_option(&mut self, option: EndpointOption) -> &mut Self {
656        self.option = Some(option);
657        self
658    }
659
660    pub fn with_user_agent(&mut self, user_agent: &str) -> &mut Self {
661        self.user_agent = user_agent.to_string();
662        self
663    }
664
665    pub fn with_transport_layer(&mut self, transport_layer: TransportLayer) -> &mut Self {
666        self.transport_layer.replace(transport_layer);
667        self
668    }
669
670    pub fn with_cancel_token(&mut self, cancel_token: CancellationToken) -> &mut Self {
671        self.cancel_token.replace(cancel_token);
672        self
673    }
674
675    pub fn with_timer_interval(&mut self, timer_interval: Duration) -> &mut Self {
676        self.timer_interval.replace(timer_interval);
677        self
678    }
679    pub fn with_allows(&mut self, allows: Vec<rsip::Method>) -> &mut Self {
680        self.allows = allows;
681        self
682    }
683    pub fn with_inspector(&mut self, inspector: Box<dyn MessageInspector>) -> &mut Self {
684        self.message_inspector = Some(inspector);
685        self
686    }
687    pub fn with_target_locator(&mut self, locator: Box<dyn TargetLocator>) -> &mut Self {
688        self.target_locator = Some(locator);
689        self
690    }
691
692    pub fn with_transport_inspector(
693        &mut self,
694        inspector: Box<dyn TransportEventInspector>,
695    ) -> &mut Self {
696        self.transport_inspector = Some(inspector);
697        self
698    }
699
700    pub fn with_domain_resolver(&mut self, resolver: Box<dyn DomainResolver>) -> &mut Self {
701        self.domain_resolver = Some(resolver);
702        self
703    }
704
705    pub fn build(&mut self) -> Endpoint {
706        let cancel_token = self.cancel_token.take().unwrap_or_default();
707        let transport_layer = self.transport_layer.take().unwrap_or_else(|| {
708            if let Some(resolver) = self.domain_resolver.take() {
709                TransportLayer::new_with_domain_resolver(cancel_token.clone(), resolver)
710            } else {
711                TransportLayer::new(cancel_token.clone())
712            }
713        });
714
715        let allows = self.allows.to_owned();
716        let user_agent = self.user_agent.to_owned();
717        let timer_interval = self.timer_interval.to_owned();
718        let option = self.option.take();
719        let message_inspector = self.message_inspector.take();
720        let locator = self.target_locator.take();
721        let transport_inspector = self.transport_inspector.take();
722
723        let core = EndpointInner::new(
724            user_agent,
725            transport_layer,
726            cancel_token,
727            timer_interval,
728            allows,
729            option,
730            message_inspector,
731            locator,
732            transport_inspector,
733        );
734
735        Endpoint { inner: core }
736    }
737}
738
739impl Endpoint {
740    pub async fn serve(&self) {
741        let inner = self.inner.clone();
742        match inner.serve().await {
743            Ok(()) => {
744                info!("endpoint shutdown");
745            }
746            Err(e) => {
747                warn!(error = ?e, "endpoint serve error");
748            }
749        }
750    }
751
752    pub fn shutdown(&self) {
753        info!("endpoint shutdown requested");
754        self.inner.cancel_token.cancel();
755    }
756
757    //
758    // get incoming requests from the endpoint
759    // don't call repeat!
760    pub fn incoming_transactions(&self) -> Result<TransactionReceiver> {
761        self.inner
762            .incoming_receiver
763            .lock()
764            .unwrap()
765            .take()
766            .ok_or_else(|| Error::EndpointError("incoming recevier taken".to_string()))
767    }
768
769    pub fn get_addrs(&self) -> Vec<SipAddr> {
770        self.inner.transport_layer.get_addrs()
771    }
772}