ftth_rsipstack/transaction/
endpoint.rs

1use super::{
2    key::TransactionKey,
3    make_via_branch,
4    timer::Timer,
5    transaction::{Transaction, TransactionEvent, TransactionEventSender},
6    SipConnection, TransactionReceiver, TransactionSender, TransactionTimer,
7};
8use crate::rsip;
9use crate::{
10    dialog::DialogId,
11    transport::{SipAddr, TransportEvent, TransportLayer},
12    Error, Result, VERSION,
13};
14use rsip::{prelude::HeadersExt, SipMessage};
15use std::{
16    collections::HashMap,
17    sync::{Arc, Mutex, RwLock},
18    time::{Duration, Instant},
19};
20use tokio::{
21    select,
22    sync::mpsc::{error, unbounded_channel},
23};
24use tokio_util::sync::CancellationToken;
25use tracing::{debug, info, trace, warn};
26
27pub trait MessageInspector: Send + Sync {
28    fn before_send(&self, msg: SipMessage) -> SipMessage;
29    fn after_received(&self, msg: SipMessage) -> SipMessage;
30}
31
32pub struct EndpointOption {
33    pub t1: Duration,
34    pub t4: Duration,
35    pub t1x64: Duration,
36    pub timerc: Duration,
37    pub ignore_out_of_dialog_option: bool,
38    pub callid_suffix: Option<String>,
39    pub dialog_keepalive_duration: Option<Duration>,
40}
41
42impl Default for EndpointOption {
43    fn default() -> Self {
44        EndpointOption {
45            t1: Duration::from_millis(500),
46            t4: Duration::from_secs(4),
47            t1x64: Duration::from_millis(64 * 500),
48            timerc: Duration::from_secs(180),
49            ignore_out_of_dialog_option: true,
50            callid_suffix: None,
51            dialog_keepalive_duration: Some(Duration::from_secs(60)),
52        }
53    }
54}
55
56pub struct EndpointStats {
57    pub running_transactions: usize,
58    pub finished_transactions: usize,
59    pub waiting_ack: usize,
60}
61
62/// SIP Endpoint Core Implementation
63///
64/// `EndpointInner` is the core implementation of a SIP endpoint that manages
65/// transactions, timers, and transport layer communication. It serves as the
66/// central coordination point for all SIP protocol operations.
67///
68/// # Key Responsibilities
69///
70/// * Managing active SIP transactions
71/// * Handling SIP timers (Timer A, B, D, E, F, G, K)
72/// * Coordinating with the transport layer
73/// * Processing incoming and outgoing SIP messages
74/// * Maintaining transaction state and cleanup
75///
76/// # Fields
77///
78/// * `allows` - List of supported SIP methods
79/// * `user_agent` - User-Agent header value for outgoing messages
80/// * `timers` - Timer management system for SIP timers
81/// * `transport_layer` - Transport layer for network communication
82/// * `finished_transactions` - Cache of completed transactions
83/// * `transactions` - Active transaction senders
84/// * `incoming_sender` - Channel for incoming transaction notifications
85/// * `cancel_token` - Cancellation token for graceful shutdown
86/// * `timer_interval` - Interval for timer processing
87/// * `t1`, `t4`, `t1x64` - SIP timer values as per RFC 3261
88///
89/// # Timer Values
90///
91/// * `t1` - RTT estimate (default 500ms)
92/// * `t4` - Maximum duration a message will remain in the network (default 4s)
93/// * `t1x64` - Maximum retransmission timeout (default 32s)
94pub struct EndpointInner {
95    pub allows: Mutex<Option<Vec<rsip::Method>>>,
96    pub user_agent: String,
97    pub timers: Timer<TransactionTimer>,
98    pub transport_layer: TransportLayer,
99    pub finished_transactions: RwLock<HashMap<TransactionKey, Option<SipMessage>>>,
100    pub transactions: RwLock<HashMap<TransactionKey, TransactionEventSender>>,
101    pub waiting_ack: RwLock<HashMap<DialogId, TransactionKey>>,
102    incoming_sender: TransactionSender,
103    incoming_receiver: Mutex<Option<TransactionReceiver>>,
104    cancel_token: CancellationToken,
105    timer_interval: Duration,
106    pub(super) inspector: Option<Box<dyn MessageInspector>>,
107    pub option: EndpointOption,
108}
109pub type EndpointInnerRef = Arc<EndpointInner>;
110
111/// SIP Endpoint Builder
112///
113/// `EndpointBuilder` provides a fluent interface for constructing SIP endpoints
114/// with custom configuration. It follows the builder pattern to allow flexible
115/// endpoint configuration.
116///
117/// # Examples
118///
119/// ```rust
120/// use ftth_rsipstack::EndpointBuilder;
121/// use std::time::Duration;
122///
123/// let endpoint = EndpointBuilder::new()
124///     .with_user_agent("MyApp/1.0")
125///     .with_timer_interval(Duration::from_millis(10))
126///     .with_allows(vec![rsip::Method::Invite, rsip::Method::Bye])
127///     .build();
128/// ```
129pub struct EndpointBuilder {
130    allows: Vec<rsip::Method>,
131    user_agent: String,
132    transport_layer: Option<TransportLayer>,
133    cancel_token: Option<CancellationToken>,
134    timer_interval: Option<Duration>,
135    option: Option<EndpointOption>,
136    inspector: Option<Box<dyn MessageInspector>>,
137}
138
139/// SIP Endpoint
140///
141/// `Endpoint` is the main entry point for SIP protocol operations. It provides
142/// a high-level interface for creating and managing SIP transactions, handling
143/// incoming requests, and coordinating with the transport layer.
144///
145/// # Key Features
146///
147/// * Transaction management and lifecycle
148/// * Automatic timer handling per RFC 3261
149/// * Transport layer abstraction
150/// * Graceful shutdown support
151/// * Incoming request processing
152///
153/// # Examples
154///
155/// ```rust,no_run
156/// use ftth_rsipstack::EndpointBuilder;
157/// use tokio_util::sync::CancellationToken;
158///
159/// #[tokio::main]
160/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
161///     let endpoint = EndpointBuilder::new()
162///         .with_user_agent("MyApp/1.0")
163///         .build();
164///     
165///     // Get incoming transactions
166///     let mut incoming = endpoint.incoming_transactions().expect("incoming_transactions");
167///     
168///     // Start the endpoint
169///     let endpoint_inner = endpoint.inner.clone();
170///     tokio::spawn(async move {
171///          endpoint_inner.serve().await.ok();
172///     });
173///     
174///     // Process incoming transactions
175///     while let Some(transaction) = incoming.recv().await {
176///         // Handle transaction
177///         break; // Exit for example
178///     }
179///     
180///     Ok(())
181/// }
182/// ```
183///
184/// # Lifecycle
185///
186/// 1. Create endpoint using `EndpointBuilder`
187/// 2. Start serving with `serve()` method
188/// 3. Process incoming transactions via `incoming_transactions()`
189/// 4. Shutdown gracefully with `shutdown()`
190pub struct Endpoint {
191    pub inner: EndpointInnerRef,
192}
193
194impl EndpointInner {
195    pub fn new(
196        user_agent: String,
197        transport_layer: TransportLayer,
198        cancel_token: CancellationToken,
199        timer_interval: Option<Duration>,
200        allows: Vec<rsip::Method>,
201        option: Option<EndpointOption>,
202        inspector: Option<Box<dyn MessageInspector>>,
203    ) -> Arc<Self> {
204        let (incoming_sender, incoming_receiver) = unbounded_channel();
205        Arc::new(EndpointInner {
206            allows: Mutex::new(Some(allows)),
207            user_agent,
208            timers: Timer::new(),
209            transport_layer,
210            transactions: RwLock::new(HashMap::new()),
211            finished_transactions: RwLock::new(HashMap::new()),
212            waiting_ack: RwLock::new(HashMap::new()),
213            timer_interval: timer_interval.unwrap_or(Duration::from_millis(20)),
214            cancel_token,
215            incoming_sender,
216            incoming_receiver: Mutex::new(Some(incoming_receiver)),
217            option: option.unwrap_or_default(),
218            inspector,
219        })
220    }
221
222    pub async fn serve(self: &Arc<Self>) -> Result<()> {
223        select! {
224            _ = self.cancel_token.cancelled() => {
225            },
226            r = self.process_timer() => {
227                _ = r?
228            },
229            r = self.clone().process_transport_layer() => {
230                _ = r?
231            },
232        }
233        Ok(())
234    }
235
236    // process transport layer, receive message from transport layer
237    async fn process_transport_layer(self: Arc<Self>) -> Result<()> {
238        self.transport_layer.serve_listens().await.ok();
239
240        let mut transport_rx = match self
241            .transport_layer
242            .inner
243            .transport_rx
244            .lock()
245            .unwrap()
246            .take()
247        {
248            Some(rx) => rx,
249            None => {
250                return Err(Error::EndpointError("transport_rx not set".to_string()));
251            }
252        };
253
254        while let Some(event) = transport_rx.recv().await {
255            match event {
256                TransportEvent::Incoming(msg, connection, from) => {
257                    match self.on_received_message(msg, connection).await {
258                        Ok(()) => {}
259                        Err(e) => {
260                            warn!(addr=%from,"on_received_message error: {}", e);
261                        }
262                    }
263                }
264                TransportEvent::New(t) => {
265                    info!(addr=%t.get_addr(), "new connection");
266                }
267                TransportEvent::Closed(t) => {
268                    info!(addr=%t.get_addr(), "closed connection");
269                }
270            }
271        }
272        Ok(())
273    }
274
275    pub async fn process_timer(&self) -> Result<()> {
276        let mut ticker = tokio::time::interval(self.timer_interval);
277        loop {
278            for t in self.timers.poll(Instant::now()) {
279                match t {
280                    TransactionTimer::TimerCleanup(key) => {
281                        trace!(%key, "TimerCleanup");
282                        self.transactions
283                            .write()
284                            .as_mut()
285                            .map(|ts| ts.remove(&key))
286                            .ok();
287                        self.finished_transactions
288                            .write()
289                            .as_mut()
290                            .map(|t| t.remove(&key))
291                            .ok();
292                        continue;
293                    }
294                    _ => {}
295                }
296
297                if let Ok(Some(tu)) =
298                    { self.transactions.read().as_ref().map(|ts| ts.get(&t.key())) }
299                {
300                    match tu.send(TransactionEvent::Timer(t)) {
301                        Ok(_) => {}
302                        Err(error::SendError(t)) => match t {
303                            TransactionEvent::Timer(t) => {
304                                self.detach_transaction(t.key(), None);
305                            }
306                            _ => {}
307                        },
308                    }
309                }
310            }
311            ticker.tick().await;
312        }
313    }
314
315    // receive message from transport layer
316    pub async fn on_received_message(
317        self: &Arc<Self>,
318        msg: SipMessage,
319        connection: SipConnection,
320    ) -> Result<()> {
321        let mut key = match &msg {
322            SipMessage::Request(req) => {
323                TransactionKey::from_request(req, super::key::TransactionRole::Server)?
324            }
325            SipMessage::Response(resp) => {
326                TransactionKey::from_response(resp, super::key::TransactionRole::Client)?
327            }
328        };
329        match &msg {
330            SipMessage::Request(req) => {
331                match req.method() {
332                    rsip::Method::Ack => match DialogId::try_from(req) {
333                        Ok(dialog_id) => {
334                            let tx_key = self
335                                .waiting_ack
336                                .read()
337                                .map(|wa| wa.get(&dialog_id).cloned());
338                            if let Ok(Some(tx_key)) = tx_key {
339                                key = tx_key;
340                            }
341                        }
342                        Err(_) => {}
343                    },
344                    _ => {}
345                }
346                // check is the termination of an existing transaction
347                let last_message = self
348                    .finished_transactions
349                    .read()
350                    .unwrap()
351                    .get(&key)
352                    .cloned()
353                    .flatten();
354
355                if let Some(last_message) = last_message {
356                    connection.send(last_message, None).await?;
357                    return Ok(());
358                }
359            }
360            SipMessage::Response(resp) => {
361                let last_message = self
362                    .finished_transactions
363                    .read()
364                    .unwrap()
365                    .get(&key)
366                    .cloned()
367                    .flatten();
368
369                if let Some(mut last_message) = last_message {
370                    match last_message {
371                        SipMessage::Request(ref mut last_req) => {
372                            if last_req.method() == &rsip::Method::Ack {
373                                match resp.status_code.kind() {
374                                    rsip::StatusCodeKind::Provisional => {
375                                        return Ok(());
376                                    }
377                                    rsip::StatusCodeKind::Successful => {
378                                        if last_req.to_header()?.tag().ok().is_none() {
379                                            // don't ack 2xx response when ack is placeholder
380                                            return Ok(());
381                                        }
382                                    }
383                                    _ => {}
384                                }
385                                if let Ok(Some(tag)) = resp.to_header()?.tag() {
386                                    last_req.to_header_mut().and_then(|h| h.mut_tag(tag)).ok();
387                                }
388                            }
389                        }
390                        _ => {}
391                    }
392                    connection.send(last_message, None).await?;
393                    return Ok(());
394                }
395            }
396        };
397
398        let msg = if let Some(inspector) = &self.inspector {
399            inspector.after_received(msg)
400        } else {
401            msg
402        };
403
404        if let Some(tu) = self.transactions.read().unwrap().get(&key) {
405            tu.send(TransactionEvent::Received(msg, Some(connection)))
406                .map_err(|e| Error::TransactionError(e.to_string(), key))?;
407            return Ok(());
408        }
409        // if the transaction is not exist, create a new transaction
410        let request = match msg {
411            SipMessage::Request(req) => req,
412            SipMessage::Response(resp) => {
413                if resp.cseq_header()?.method()? != rsip::Method::Cancel {
414                    debug!(%key, "the transaction is not exist {}", resp);
415                }
416                return Ok(());
417            }
418        };
419
420        match request.method {
421            rsip::Method::Cancel => {
422                let resp = self.make_response(
423                    &request,
424                    rsip::StatusCode::CallTransactionDoesNotExist,
425                    None,
426                );
427                let resp = if let Some(ref inspector) = self.inspector {
428                    inspector.before_send(resp.into())
429                } else {
430                    resp.into()
431                };
432                connection.send(resp, None).await?;
433                return Ok(());
434            }
435            rsip::Method::Ack => return Ok(()),
436            _ => {}
437        }
438
439        let tx =
440            Transaction::new_server(key.clone(), request.clone(), self.clone(), Some(connection));
441
442        self.incoming_sender.send(tx).ok();
443        Ok(())
444    }
445
446    pub fn attach_transaction(&self, key: &TransactionKey, tu_sender: TransactionEventSender) {
447        trace!(%key, "attach transaction");
448        self.transactions
449            .write()
450            .as_mut()
451            .map(|ts| ts.insert(key.clone(), tu_sender))
452            .ok();
453    }
454
455    pub fn detach_transaction(&self, key: &TransactionKey, last_message: Option<SipMessage>) {
456        trace!(%key, "detach transaction");
457        self.transactions
458            .write()
459            .as_mut()
460            .map(|ts| ts.remove(key))
461            .ok();
462
463        if let Some(msg) = last_message {
464            self.timers.timeout(
465                self.option.t1x64,
466                TransactionTimer::TimerCleanup(key.clone()), // maybe use TimerK ???
467            );
468
469            self.finished_transactions
470                .write()
471                .as_mut()
472                .map(|ft| ft.insert(key.clone(), Some(msg)))
473                .ok();
474        }
475    }
476
477    pub fn get_addrs(&self) -> Vec<SipAddr> {
478        self.transport_layer.get_addrs()
479    }
480
481    pub fn get_record_route(&self) -> Result<rsip::typed::RecordRoute> {
482        let first_addr = self
483            .transport_layer
484            .get_addrs()
485            .first()
486            .ok_or(Error::EndpointError("not sipaddrs".to_string()))
487            .cloned()?;
488        let rr = rsip::UriWithParamsList(vec![rsip::UriWithParams {
489            uri: first_addr.into(),
490            params: vec![rsip::Param::Other("lr".into(), None)],
491        }]);
492        Ok(rr.into())
493    }
494
495    pub fn get_via(
496        &self,
497        addr: Option<crate::transport::SipAddr>,
498        branch: Option<rsip::Param>,
499    ) -> Result<rsip::typed::Via> {
500        let first_addr = match addr {
501            Some(addr) => addr,
502            None => self
503                .transport_layer
504                .get_addrs()
505                .first()
506                .ok_or(Error::EndpointError("not sipaddrs".to_string()))
507                .cloned()?,
508        };
509
510        let via = rsip::typed::Via {
511            version: rsip::Version::V2,
512            transport: first_addr.r#type.unwrap_or_default(),
513            uri: first_addr.addr.into(),
514            params: vec![
515                branch.unwrap_or_else(make_via_branch),
516                rsip::Param::Other("rport".into(), None),
517            ],
518        };
519        Ok(via)
520    }
521
522    pub fn get_stats(&self) -> EndpointStats {
523        let waiting_ack = self
524            .waiting_ack
525            .read()
526            .map(|wa| wa.len())
527            .unwrap_or_default();
528        let running_transactions = self
529            .transactions
530            .read()
531            .map(|ts| ts.len())
532            .unwrap_or_default();
533        let finished_transactions = self
534            .finished_transactions
535            .read()
536            .map(|ft| ft.len())
537            .unwrap_or_default();
538
539        EndpointStats {
540            running_transactions,
541            finished_transactions,
542            waiting_ack,
543        }
544    }
545}
546
547impl EndpointBuilder {
548    pub fn new() -> Self {
549        EndpointBuilder {
550            allows: Vec::new(),
551            user_agent: VERSION.to_string(),
552            transport_layer: None,
553            cancel_token: None,
554            timer_interval: None,
555            option: None,
556            inspector: None,
557        }
558    }
559    pub fn with_option(&mut self, option: EndpointOption) -> &mut Self {
560        self.option = Some(option);
561        self
562    }
563    pub fn with_user_agent(&mut self, user_agent: &str) -> &mut Self {
564        self.user_agent = user_agent.to_string();
565        self
566    }
567
568    pub fn with_transport_layer(&mut self, transport_layer: TransportLayer) -> &mut Self {
569        self.transport_layer.replace(transport_layer);
570        self
571    }
572
573    pub fn with_cancel_token(&mut self, cancel_token: CancellationToken) -> &mut Self {
574        self.cancel_token.replace(cancel_token);
575        self
576    }
577
578    pub fn with_timer_interval(&mut self, timer_interval: Duration) -> &mut Self {
579        self.timer_interval.replace(timer_interval);
580        self
581    }
582    pub fn with_allows(&mut self, allows: Vec<rsip::Method>) -> &mut Self {
583        self.allows = allows;
584        self
585    }
586    pub fn with_inspector(&mut self, inspector: Box<dyn MessageInspector>) -> &mut Self {
587        self.inspector = Some(inspector);
588        self
589    }
590    pub fn build(&mut self) -> Endpoint {
591        let cancel_token = self.cancel_token.take().unwrap_or_default();
592
593        let transport_layer = self
594            .transport_layer
595            .take()
596            .unwrap_or(TransportLayer::new(cancel_token.child_token()));
597
598        let allows = self.allows.to_owned();
599        let user_agent = self.user_agent.to_owned();
600        let timer_interval = self.timer_interval.to_owned();
601        let option = self.option.take();
602        let inspector = self.inspector.take();
603
604        let core = EndpointInner::new(
605            user_agent,
606            transport_layer,
607            cancel_token,
608            timer_interval,
609            allows,
610            option,
611            inspector,
612        );
613
614        Endpoint { inner: core }
615    }
616}
617
618impl Endpoint {
619    pub async fn serve(&self) {
620        let inner = self.inner.clone();
621        match inner.serve().await {
622            Ok(()) => {
623                info!("endpoint shutdown");
624            }
625            Err(e) => {
626                warn!("endpoint serve error: {:?}", e);
627            }
628        }
629    }
630
631    pub fn shutdown(&self) {
632        info!("endpoint shutdown requested");
633        self.inner.cancel_token.cancel();
634    }
635
636    //
637    // get incoming requests from the endpoint
638    // don't call repeat!
639    pub fn incoming_transactions(&self) -> Result<TransactionReceiver> {
640        self.inner
641            .incoming_receiver
642            .lock()
643            .unwrap()
644            .take()
645            .ok_or_else(|| Error::EndpointError("incoming recevier taken".to_string()))
646    }
647
648    pub fn get_addrs(&self) -> Vec<SipAddr> {
649        self.inner.transport_layer.get_addrs()
650    }
651}