ftth_rsipstack/transaction/
endpoint.rs

1use super::{
2    key::TransactionKey,
3    make_via_branch,
4    timer::Timer,
5    transaction::{Transaction, TransactionEvent, TransactionEventSender},
6    SipConnection, TransactionReceiver, TransactionSender, TransactionTimer,
7};
8use crate::{
9    dialog::DialogId,
10    transport::{SipAddr, TransportEvent, TransportLayer},
11    Error, Result, VERSION,
12};
13use rsip::{prelude::HeadersExt, SipMessage};
14use std::{
15    collections::HashMap,
16    sync::{Arc, Mutex, RwLock},
17    time::{Duration, Instant},
18};
19use tokio::{
20    select,
21    sync::mpsc::{error, unbounded_channel},
22};
23use tokio_util::sync::CancellationToken;
24use tracing::{debug, info, trace, warn};
25
26pub trait MessageInspector: Send + Sync {
27    fn before_send(&self, msg: SipMessage) -> SipMessage;
28    fn after_received(&self, msg: SipMessage) -> SipMessage;
29}
30
31pub struct EndpointOption {
32    pub t1: Duration,
33    pub t4: Duration,
34    pub t1x64: Duration,
35    pub timerc: Duration,
36    pub ignore_out_of_dialog_option: bool,
37    pub callid_suffix: Option<String>,
38    pub dialog_keepalive_duration: Option<Duration>,
39}
40
41impl Default for EndpointOption {
42    fn default() -> Self {
43        EndpointOption {
44            t1: Duration::from_millis(500),
45            t4: Duration::from_secs(4),
46            t1x64: Duration::from_millis(64 * 500),
47            timerc: Duration::from_secs(180),
48            ignore_out_of_dialog_option: true,
49            callid_suffix: None,
50            dialog_keepalive_duration: Some(Duration::from_secs(60)),
51        }
52    }
53}
54
55pub struct EndpointStats {
56    pub running_transactions: usize,
57    pub finished_transactions: usize,
58    pub waiting_ack: usize,
59}
60
61/// SIP Endpoint Core Implementation
62///
63/// `EndpointInner` is the core implementation of a SIP endpoint that manages
64/// transactions, timers, and transport layer communication. It serves as the
65/// central coordination point for all SIP protocol operations.
66///
67/// # Key Responsibilities
68///
69/// * Managing active SIP transactions
70/// * Handling SIP timers (Timer A, B, D, E, F, G, K)
71/// * Coordinating with the transport layer
72/// * Processing incoming and outgoing SIP messages
73/// * Maintaining transaction state and cleanup
74///
75/// # Fields
76///
77/// * `allows` - List of supported SIP methods
78/// * `user_agent` - User-Agent header value for outgoing messages
79/// * `timers` - Timer management system for SIP timers
80/// * `transport_layer` - Transport layer for network communication
81/// * `finished_transactions` - Cache of completed transactions
82/// * `transactions` - Active transaction senders
83/// * `incoming_sender` - Channel for incoming transaction notifications
84/// * `cancel_token` - Cancellation token for graceful shutdown
85/// * `timer_interval` - Interval for timer processing
86/// * `t1`, `t4`, `t1x64` - SIP timer values as per RFC 3261
87///
88/// # Timer Values
89///
90/// * `t1` - RTT estimate (default 500ms)
91/// * `t4` - Maximum duration a message will remain in the network (default 4s)
92/// * `t1x64` - Maximum retransmission timeout (default 32s)
93pub struct EndpointInner {
94    pub allows: Mutex<Option<Vec<rsip::Method>>>,
95    pub user_agent: String,
96    pub timers: Timer<TransactionTimer>,
97    pub transport_layer: TransportLayer,
98    pub finished_transactions: RwLock<HashMap<TransactionKey, Option<SipMessage>>>,
99    pub transactions: RwLock<HashMap<TransactionKey, TransactionEventSender>>,
100    pub waiting_ack: RwLock<HashMap<DialogId, TransactionKey>>,
101    incoming_sender: TransactionSender,
102    incoming_receiver: Mutex<Option<TransactionReceiver>>,
103    cancel_token: CancellationToken,
104    timer_interval: Duration,
105    pub(super) inspector: Option<Box<dyn MessageInspector>>,
106    pub option: EndpointOption,
107}
108pub type EndpointInnerRef = Arc<EndpointInner>;
109
110/// SIP Endpoint Builder
111///
112/// `EndpointBuilder` provides a fluent interface for constructing SIP endpoints
113/// with custom configuration. It follows the builder pattern to allow flexible
114/// endpoint configuration.
115///
116/// # Examples
117///
118/// ```rust
119/// use rsipstack::EndpointBuilder;
120/// use std::time::Duration;
121///
122/// let endpoint = EndpointBuilder::new()
123///     .with_user_agent("MyApp/1.0")
124///     .with_timer_interval(Duration::from_millis(10))
125///     .with_allows(vec![rsip::Method::Invite, rsip::Method::Bye])
126///     .build();
127/// ```
128pub struct EndpointBuilder {
129    allows: Vec<rsip::Method>,
130    user_agent: String,
131    transport_layer: Option<TransportLayer>,
132    cancel_token: Option<CancellationToken>,
133    timer_interval: Option<Duration>,
134    option: Option<EndpointOption>,
135    inspector: Option<Box<dyn MessageInspector>>,
136}
137
138/// SIP Endpoint
139///
140/// `Endpoint` is the main entry point for SIP protocol operations. It provides
141/// a high-level interface for creating and managing SIP transactions, handling
142/// incoming requests, and coordinating with the transport layer.
143///
144/// # Key Features
145///
146/// * Transaction management and lifecycle
147/// * Automatic timer handling per RFC 3261
148/// * Transport layer abstraction
149/// * Graceful shutdown support
150/// * Incoming request processing
151///
152/// # Examples
153///
154/// ```rust,no_run
155/// use rsipstack::EndpointBuilder;
156/// use tokio_util::sync::CancellationToken;
157///
158/// #[tokio::main]
159/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
160///     let endpoint = EndpointBuilder::new()
161///         .with_user_agent("MyApp/1.0")
162///         .build();
163///     
164///     // Get incoming transactions
165///     let mut incoming = endpoint.incoming_transactions().expect("incoming_transactions");
166///     
167///     // Start the endpoint
168///     let endpoint_inner = endpoint.inner.clone();
169///     tokio::spawn(async move {
170///          endpoint_inner.serve().await.ok();
171///     });
172///     
173///     // Process incoming transactions
174///     while let Some(transaction) = incoming.recv().await {
175///         // Handle transaction
176///         break; // Exit for example
177///     }
178///     
179///     Ok(())
180/// }
181/// ```
182///
183/// # Lifecycle
184///
185/// 1. Create endpoint using `EndpointBuilder`
186/// 2. Start serving with `serve()` method
187/// 3. Process incoming transactions via `incoming_transactions()`
188/// 4. Shutdown gracefully with `shutdown()`
189pub struct Endpoint {
190    pub inner: EndpointInnerRef,
191}
192
193impl EndpointInner {
194    pub fn new(
195        user_agent: String,
196        transport_layer: TransportLayer,
197        cancel_token: CancellationToken,
198        timer_interval: Option<Duration>,
199        allows: Vec<rsip::Method>,
200        option: Option<EndpointOption>,
201        inspector: Option<Box<dyn MessageInspector>>,
202    ) -> Arc<Self> {
203        let (incoming_sender, incoming_receiver) = unbounded_channel();
204        Arc::new(EndpointInner {
205            allows: Mutex::new(Some(allows)),
206            user_agent,
207            timers: Timer::new(),
208            transport_layer,
209            transactions: RwLock::new(HashMap::new()),
210            finished_transactions: RwLock::new(HashMap::new()),
211            waiting_ack: RwLock::new(HashMap::new()),
212            timer_interval: timer_interval.unwrap_or(Duration::from_millis(20)),
213            cancel_token,
214            incoming_sender,
215            incoming_receiver: Mutex::new(Some(incoming_receiver)),
216            option: option.unwrap_or_default(),
217            inspector,
218        })
219    }
220
221    pub async fn serve(self: &Arc<Self>) -> Result<()> {
222        select! {
223            _ = self.cancel_token.cancelled() => {
224            },
225            r = self.process_timer() => {
226                _ = r?
227            },
228            r = self.clone().process_transport_layer() => {
229                _ = r?
230            },
231        }
232        Ok(())
233    }
234
235    // process transport layer, receive message from transport layer
236    async fn process_transport_layer(self: Arc<Self>) -> Result<()> {
237        self.transport_layer.serve_listens().await.ok();
238
239        let mut transport_rx = match self
240            .transport_layer
241            .inner
242            .transport_rx
243            .lock()
244            .unwrap()
245            .take()
246        {
247            Some(rx) => rx,
248            None => {
249                return Err(Error::EndpointError("transport_rx not set".to_string()));
250            }
251        };
252
253        while let Some(event) = transport_rx.recv().await {
254            match event {
255                TransportEvent::Incoming(msg, connection, from) => {
256                    match self.on_received_message(msg, connection).await {
257                        Ok(()) => {}
258                        Err(e) => {
259                            warn!(addr=%from,"on_received_message error: {}", e);
260                        }
261                    }
262                }
263                TransportEvent::New(t) => {
264                    info!(addr=%t.get_addr(), "new connection");
265                }
266                TransportEvent::Closed(t) => {
267                    info!(addr=%t.get_addr(), "closed connection");
268                }
269            }
270        }
271        Ok(())
272    }
273
274    pub async fn process_timer(&self) -> Result<()> {
275        let mut ticker = tokio::time::interval(self.timer_interval);
276        loop {
277            for t in self.timers.poll(Instant::now()) {
278                match t {
279                    TransactionTimer::TimerCleanup(key) => {
280                        trace!(%key, "TimerCleanup");
281                        self.transactions
282                            .write()
283                            .as_mut()
284                            .map(|ts| ts.remove(&key))
285                            .ok();
286                        self.finished_transactions
287                            .write()
288                            .as_mut()
289                            .map(|t| t.remove(&key))
290                            .ok();
291                        continue;
292                    }
293                    _ => {}
294                }
295
296                if let Ok(Some(tu)) =
297                    { self.transactions.read().as_ref().map(|ts| ts.get(&t.key())) }
298                {
299                    match tu.send(TransactionEvent::Timer(t)) {
300                        Ok(_) => {}
301                        Err(error::SendError(t)) => match t {
302                            TransactionEvent::Timer(t) => {
303                                self.detach_transaction(t.key(), None);
304                            }
305                            _ => {}
306                        },
307                    }
308                }
309            }
310            ticker.tick().await;
311        }
312    }
313
314    // receive message from transport layer
315    pub async fn on_received_message(
316        self: &Arc<Self>,
317        msg: SipMessage,
318        connection: SipConnection,
319    ) -> Result<()> {
320        let mut key = match &msg {
321            SipMessage::Request(req) => {
322                TransactionKey::from_request(req, super::key::TransactionRole::Server)?
323            }
324            SipMessage::Response(resp) => {
325                TransactionKey::from_response(resp, super::key::TransactionRole::Client)?
326            }
327        };
328        match &msg {
329            SipMessage::Request(req) => {
330                match req.method() {
331                    rsip::Method::Ack => match DialogId::try_from(req) {
332                        Ok(dialog_id) => {
333                            let tx_key = self
334                                .waiting_ack
335                                .read()
336                                .map(|wa| wa.get(&dialog_id).cloned());
337                            if let Ok(Some(tx_key)) = tx_key {
338                                key = tx_key;
339                            }
340                        }
341                        Err(_) => {}
342                    },
343                    _ => {}
344                }
345                // check is the termination of an existing transaction
346                let last_message = self
347                    .finished_transactions
348                    .read()
349                    .unwrap()
350                    .get(&key)
351                    .cloned()
352                    .flatten();
353
354                if let Some(last_message) = last_message {
355                    connection.send(last_message, None).await?;
356                    return Ok(());
357                }
358            }
359            SipMessage::Response(resp) => {
360                let last_message = self
361                    .finished_transactions
362                    .read()
363                    .unwrap()
364                    .get(&key)
365                    .cloned()
366                    .flatten();
367
368                if let Some(mut last_message) = last_message {
369                    match last_message {
370                        SipMessage::Request(ref mut last_req) => {
371                            if last_req.method() == &rsip::Method::Ack {
372                                match resp.status_code.kind() {
373                                    rsip::StatusCodeKind::Provisional => {
374                                        return Ok(());
375                                    }
376                                    rsip::StatusCodeKind::Successful => {
377                                        if last_req.to_header()?.tag().ok().is_none() {
378                                            // don't ack 2xx response when ack is placeholder
379                                            return Ok(());
380                                        }
381                                    }
382                                    _ => {}
383                                }
384                                if let Ok(Some(tag)) = resp.to_header()?.tag() {
385                                    last_req.to_header_mut().and_then(|h| h.mut_tag(tag)).ok();
386                                }
387                            }
388                        }
389                        _ => {}
390                    }
391                    connection.send(last_message, None).await?;
392                    return Ok(());
393                }
394            }
395        };
396
397        let msg = if let Some(inspector) = &self.inspector {
398            inspector.after_received(msg)
399        } else {
400            msg
401        };
402
403        if let Some(tu) = self.transactions.read().unwrap().get(&key) {
404            tu.send(TransactionEvent::Received(msg, Some(connection)))
405                .map_err(|e| Error::TransactionError(e.to_string(), key))?;
406            return Ok(());
407        }
408        // if the transaction is not exist, create a new transaction
409        let request = match msg {
410            SipMessage::Request(req) => req,
411            SipMessage::Response(resp) => {
412                if resp.cseq_header()?.method()? != rsip::Method::Cancel {
413                    debug!(%key, "the transaction is not exist {}", resp);
414                }
415                return Ok(());
416            }
417        };
418
419        match request.method {
420            rsip::Method::Cancel => {
421                let resp = self.make_response(
422                    &request,
423                    rsip::StatusCode::CallTransactionDoesNotExist,
424                    None,
425                );
426                let resp = if let Some(ref inspector) = self.inspector {
427                    inspector.before_send(resp.into())
428                } else {
429                    resp.into()
430                };
431                connection.send(resp, None).await?;
432                return Ok(());
433            }
434            rsip::Method::Ack => return Ok(()),
435            _ => {}
436        }
437
438        let tx =
439            Transaction::new_server(key.clone(), request.clone(), self.clone(), Some(connection));
440
441        self.incoming_sender.send(tx).ok();
442        Ok(())
443    }
444
445    pub fn attach_transaction(&self, key: &TransactionKey, tu_sender: TransactionEventSender) {
446        trace!(%key, "attach transaction");
447        self.transactions
448            .write()
449            .as_mut()
450            .map(|ts| ts.insert(key.clone(), tu_sender))
451            .ok();
452    }
453
454    pub fn detach_transaction(&self, key: &TransactionKey, last_message: Option<SipMessage>) {
455        trace!(%key, "detach transaction");
456        self.transactions
457            .write()
458            .as_mut()
459            .map(|ts| ts.remove(key))
460            .ok();
461
462        if let Some(msg) = last_message {
463            self.timers.timeout(
464                self.option.t1x64,
465                TransactionTimer::TimerCleanup(key.clone()), // maybe use TimerK ???
466            );
467
468            self.finished_transactions
469                .write()
470                .as_mut()
471                .map(|ft| ft.insert(key.clone(), Some(msg)))
472                .ok();
473        }
474    }
475
476    pub fn get_addrs(&self) -> Vec<SipAddr> {
477        self.transport_layer.get_addrs()
478    }
479
480    pub fn get_record_route(&self) -> Result<rsip::typed::RecordRoute> {
481        let first_addr = self
482            .transport_layer
483            .get_addrs()
484            .first()
485            .ok_or(Error::EndpointError("not sipaddrs".to_string()))
486            .cloned()?;
487        let rr = rsip::UriWithParamsList(vec![rsip::UriWithParams {
488            uri: first_addr.into(),
489            params: vec![rsip::Param::Other("lr".into(), None)],
490        }]);
491        Ok(rr.into())
492    }
493
494    pub fn get_via(
495        &self,
496        addr: Option<crate::transport::SipAddr>,
497        branch: Option<rsip::Param>,
498    ) -> Result<rsip::typed::Via> {
499        let first_addr = match addr {
500            Some(addr) => addr,
501            None => self
502                .transport_layer
503                .get_addrs()
504                .first()
505                .ok_or(Error::EndpointError("not sipaddrs".to_string()))
506                .cloned()?,
507        };
508
509        let via = rsip::typed::Via {
510            version: rsip::Version::V2,
511            transport: first_addr.r#type.unwrap_or_default(),
512            uri: first_addr.addr.into(),
513            params: vec![
514                branch.unwrap_or_else(make_via_branch),
515                rsip::Param::Other("rport".into(), None),
516            ],
517        };
518        Ok(via)
519    }
520
521    pub fn get_stats(&self) -> EndpointStats {
522        let waiting_ack = self
523            .waiting_ack
524            .read()
525            .map(|wa| wa.len())
526            .unwrap_or_default();
527        let running_transactions = self
528            .transactions
529            .read()
530            .map(|ts| ts.len())
531            .unwrap_or_default();
532        let finished_transactions = self
533            .finished_transactions
534            .read()
535            .map(|ft| ft.len())
536            .unwrap_or_default();
537
538        EndpointStats {
539            running_transactions,
540            finished_transactions,
541            waiting_ack,
542        }
543    }
544}
545
546impl EndpointBuilder {
547    pub fn new() -> Self {
548        EndpointBuilder {
549            allows: Vec::new(),
550            user_agent: VERSION.to_string(),
551            transport_layer: None,
552            cancel_token: None,
553            timer_interval: None,
554            option: None,
555            inspector: None,
556        }
557    }
558    pub fn with_option(&mut self, option: EndpointOption) -> &mut Self {
559        self.option = Some(option);
560        self
561    }
562    pub fn with_user_agent(&mut self, user_agent: &str) -> &mut Self {
563        self.user_agent = user_agent.to_string();
564        self
565    }
566
567    pub fn with_transport_layer(&mut self, transport_layer: TransportLayer) -> &mut Self {
568        self.transport_layer.replace(transport_layer);
569        self
570    }
571
572    pub fn with_cancel_token(&mut self, cancel_token: CancellationToken) -> &mut Self {
573        self.cancel_token.replace(cancel_token);
574        self
575    }
576
577    pub fn with_timer_interval(&mut self, timer_interval: Duration) -> &mut Self {
578        self.timer_interval.replace(timer_interval);
579        self
580    }
581    pub fn with_allows(&mut self, allows: Vec<rsip::Method>) -> &mut Self {
582        self.allows = allows;
583        self
584    }
585    pub fn with_inspector(&mut self, inspector: Box<dyn MessageInspector>) -> &mut Self {
586        self.inspector = Some(inspector);
587        self
588    }
589    pub fn build(&mut self) -> Endpoint {
590        let cancel_token = self.cancel_token.take().unwrap_or_default();
591
592        let transport_layer = self
593            .transport_layer
594            .take()
595            .unwrap_or(TransportLayer::new(cancel_token.child_token()));
596
597        let allows = self.allows.to_owned();
598        let user_agent = self.user_agent.to_owned();
599        let timer_interval = self.timer_interval.to_owned();
600        let option = self.option.take();
601        let inspector = self.inspector.take();
602
603        let core = EndpointInner::new(
604            user_agent,
605            transport_layer,
606            cancel_token,
607            timer_interval,
608            allows,
609            option,
610            inspector,
611        );
612
613        Endpoint { inner: core }
614    }
615}
616
617impl Endpoint {
618    pub async fn serve(&self) {
619        let inner = self.inner.clone();
620        match inner.serve().await {
621            Ok(()) => {
622                info!("endpoint shutdown");
623            }
624            Err(e) => {
625                warn!("endpoint serve error: {:?}", e);
626            }
627        }
628    }
629
630    pub fn shutdown(&self) {
631        info!("endpoint shutdown requested");
632        self.inner.cancel_token.cancel();
633    }
634
635    //
636    // get incoming requests from the endpoint
637    // don't call repeat!
638    pub fn incoming_transactions(&self) -> Result<TransactionReceiver> {
639        self.inner
640            .incoming_receiver
641            .lock()
642            .unwrap()
643            .take()
644            .ok_or_else(|| Error::EndpointError("incoming recevier taken".to_string()))
645    }
646
647    pub fn get_addrs(&self) -> Vec<SipAddr> {
648        self.inner.transport_layer.get_addrs()
649    }
650}