ftth_rsipstack/transaction/
endpoint.rs

1use super::{
2    key::TransactionKey,
3    make_via_branch,
4    timer::Timer,
5    transaction::{Transaction, TransactionEvent, TransactionEventSender},
6    SipConnection, TransactionReceiver, TransactionSender, TransactionTimer,
7};
8use crate::rsip;
9use crate::{
10    dialog::DialogId,
11    transport::{SipAddr, TransportEvent, TransportLayer},
12    Error, Result, VERSION,
13};
14use rsip::{prelude::HeadersExt, SipMessage};
15use std::{
16    collections::HashMap,
17    sync::{Arc, Mutex, RwLock},
18    time::{Duration, Instant},
19};
20use tokio::{
21    select,
22    sync::mpsc::{error, unbounded_channel},
23};
24use tokio_util::sync::CancellationToken;
25use tracing::{debug, info, trace, warn};
26
27pub trait MessageInspector: Send + Sync {
28    fn before_send(&self, msg: SipMessage) -> SipMessage;
29    fn after_received(&self, msg: SipMessage) -> SipMessage;
30}
31
32#[non_exhaustive]
33pub struct EndpointOption {
34    pub t1: Duration,
35    pub t4: Duration,
36    pub t1x64: Duration,
37    pub timerc: Duration,
38    pub ignore_out_of_dialog_option: bool,
39    pub callid_suffix: Option<String>,
40    pub dialog_keepalive_duration: Option<Duration>,
41    pub follow_record_route: bool,
42}
43
44impl Default for EndpointOption {
45    fn default() -> Self {
46        EndpointOption {
47            t1: Duration::from_millis(500),
48            t4: Duration::from_secs(4),
49            t1x64: Duration::from_millis(64 * 500),
50            timerc: Duration::from_secs(180),
51            ignore_out_of_dialog_option: true,
52            callid_suffix: None,
53            dialog_keepalive_duration: Some(Duration::from_secs(60)),
54            follow_record_route: true,
55        }
56    }
57}
58
59pub struct EndpointStats {
60    pub running_transactions: usize,
61    pub finished_transactions: usize,
62    pub waiting_ack: usize,
63}
64
65/// SIP Endpoint Core Implementation
66///
67/// `EndpointInner` is the core implementation of a SIP endpoint that manages
68/// transactions, timers, and transport layer communication. It serves as the
69/// central coordination point for all SIP protocol operations.
70///
71/// # Key Responsibilities
72///
73/// * Managing active SIP transactions
74/// * Handling SIP timers (Timer A, B, D, E, F, G, K)
75/// * Coordinating with the transport layer
76/// * Processing incoming and outgoing SIP messages
77/// * Maintaining transaction state and cleanup
78///
79/// # Fields
80///
81/// * `allows` - List of supported SIP methods
82/// * `user_agent` - User-Agent header value for outgoing messages
83/// * `timers` - Timer management system for SIP timers
84/// * `transport_layer` - Transport layer for network communication
85/// * `finished_transactions` - Cache of completed transactions
86/// * `transactions` - Active transaction senders
87/// * `incoming_sender` - Channel for incoming transaction notifications
88/// * `cancel_token` - Cancellation token for graceful shutdown
89/// * `timer_interval` - Interval for timer processing
90/// * `t1`, `t4`, `t1x64` - SIP timer values as per RFC 3261
91///
92/// # Timer Values
93///
94/// * `t1` - RTT estimate (default 500ms)
95/// * `t4` - Maximum duration a message will remain in the network (default 4s)
96/// * `t1x64` - Maximum retransmission timeout (default 32s)
97pub struct EndpointInner {
98    pub allows: Mutex<Option<Vec<rsip::Method>>>,
99    pub user_agent: String,
100    pub timers: Timer<TransactionTimer>,
101    pub transport_layer: TransportLayer,
102    pub finished_transactions: RwLock<HashMap<TransactionKey, Option<SipMessage>>>,
103    pub transactions: RwLock<HashMap<TransactionKey, TransactionEventSender>>,
104    pub waiting_ack: RwLock<HashMap<DialogId, TransactionKey>>,
105    incoming_sender: TransactionSender,
106    incoming_receiver: Mutex<Option<TransactionReceiver>>,
107    cancel_token: CancellationToken,
108    timer_interval: Duration,
109    pub(super) inspector: Option<Box<dyn MessageInspector>>,
110    pub option: EndpointOption,
111}
112pub type EndpointInnerRef = Arc<EndpointInner>;
113
114/// SIP Endpoint Builder
115///
116/// `EndpointBuilder` provides a fluent interface for constructing SIP endpoints
117/// with custom configuration. It follows the builder pattern to allow flexible
118/// endpoint configuration.
119///
120/// # Examples
121///
122/// ```rust
123/// use ftth_rsipstack::EndpointBuilder;
124/// use std::time::Duration;
125///
126/// let endpoint = EndpointBuilder::new()
127///     .with_user_agent("MyApp/1.0")
128///     .with_timer_interval(Duration::from_millis(10))
129///     .with_allows(vec![rsip::Method::Invite, rsip::Method::Bye])
130///     .build();
131/// ```
132pub struct EndpointBuilder {
133    allows: Vec<rsip::Method>,
134    user_agent: String,
135    transport_layer: Option<TransportLayer>,
136    cancel_token: Option<CancellationToken>,
137    timer_interval: Option<Duration>,
138    option: Option<EndpointOption>,
139    inspector: Option<Box<dyn MessageInspector>>,
140}
141
142/// SIP Endpoint
143///
144/// `Endpoint` is the main entry point for SIP protocol operations. It provides
145/// a high-level interface for creating and managing SIP transactions, handling
146/// incoming requests, and coordinating with the transport layer.
147///
148/// # Key Features
149///
150/// * Transaction management and lifecycle
151/// * Automatic timer handling per RFC 3261
152/// * Transport layer abstraction
153/// * Graceful shutdown support
154/// * Incoming request processing
155///
156/// # Examples
157///
158/// ```rust,no_run
159/// use ftth_rsipstack::EndpointBuilder;
160/// use tokio_util::sync::CancellationToken;
161///
162/// #[tokio::main]
163/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
164///     let endpoint = EndpointBuilder::new()
165///         .with_user_agent("MyApp/1.0")
166///         .build();
167///     
168///     // Get incoming transactions
169///     let mut incoming = endpoint.incoming_transactions().expect("incoming_transactions");
170///     
171///     // Start the endpoint
172///     let endpoint_inner = endpoint.inner.clone();
173///     tokio::spawn(async move {
174///          endpoint_inner.serve().await.ok();
175///     });
176///     
177///     // Process incoming transactions
178///     while let Some(transaction) = incoming.recv().await {
179///         // Handle transaction
180///         break; // Exit for example
181///     }
182///     
183///     Ok(())
184/// }
185/// ```
186///
187/// # Lifecycle
188///
189/// 1. Create endpoint using `EndpointBuilder`
190/// 2. Start serving with `serve()` method
191/// 3. Process incoming transactions via `incoming_transactions()`
192/// 4. Shutdown gracefully with `shutdown()`
193pub struct Endpoint {
194    pub inner: EndpointInnerRef,
195}
196
197impl EndpointInner {
198    pub fn new(
199        user_agent: String,
200        transport_layer: TransportLayer,
201        cancel_token: CancellationToken,
202        timer_interval: Option<Duration>,
203        allows: Vec<rsip::Method>,
204        option: Option<EndpointOption>,
205        inspector: Option<Box<dyn MessageInspector>>,
206    ) -> Arc<Self> {
207        let (incoming_sender, incoming_receiver) = unbounded_channel();
208        Arc::new(EndpointInner {
209            allows: Mutex::new(Some(allows)),
210            user_agent,
211            timers: Timer::new(),
212            transport_layer,
213            transactions: RwLock::new(HashMap::new()),
214            finished_transactions: RwLock::new(HashMap::new()),
215            waiting_ack: RwLock::new(HashMap::new()),
216            timer_interval: timer_interval.unwrap_or(Duration::from_millis(20)),
217            cancel_token,
218            incoming_sender,
219            incoming_receiver: Mutex::new(Some(incoming_receiver)),
220            option: option.unwrap_or_default(),
221            inspector,
222        })
223    }
224
225    pub async fn serve(self: &Arc<Self>) -> Result<()> {
226        select! {
227            _ = self.cancel_token.cancelled() => {
228            },
229            r = self.process_timer() => {
230                _ = r?
231            },
232            r = self.clone().process_transport_layer() => {
233                _ = r?
234            },
235        }
236        Ok(())
237    }
238
239    // process transport layer, receive message from transport layer
240    async fn process_transport_layer(self: Arc<Self>) -> Result<()> {
241        self.transport_layer.serve_listens().await.ok();
242
243        let mut transport_rx = match self
244            .transport_layer
245            .inner
246            .transport_rx
247            .lock()
248            .unwrap()
249            .take()
250        {
251            Some(rx) => rx,
252            None => {
253                return Err(Error::EndpointError("transport_rx not set".to_string()));
254            }
255        };
256
257        while let Some(event) = transport_rx.recv().await {
258            match event {
259                TransportEvent::Incoming(msg, connection, from) => {
260                    match self.on_received_message(msg, connection).await {
261                        Ok(()) => {}
262                        Err(e) => {
263                            warn!(addr=%from,"on_received_message error: {}", e);
264                        }
265                    }
266                }
267                TransportEvent::New(t) => {
268                    info!(addr=%t.get_addr(), "new connection");
269                }
270                TransportEvent::Closed(t) => {
271                    info!(addr=%t.get_addr(), "closed connection");
272                }
273            }
274        }
275        Ok(())
276    }
277
278    pub async fn process_timer(&self) -> Result<()> {
279        let mut ticker = tokio::time::interval(self.timer_interval);
280        loop {
281            for t in self.timers.poll(Instant::now()) {
282                match t {
283                    TransactionTimer::TimerCleanup(key) => {
284                        trace!(%key, "TimerCleanup");
285                        self.transactions
286                            .write()
287                            .as_mut()
288                            .map(|ts| ts.remove(&key))
289                            .ok();
290                        self.finished_transactions
291                            .write()
292                            .as_mut()
293                            .map(|t| t.remove(&key))
294                            .ok();
295                        continue;
296                    }
297                    _ => {}
298                }
299
300                if let Ok(Some(tu)) =
301                    { self.transactions.read().as_ref().map(|ts| ts.get(&t.key())) }
302                {
303                    match tu.send(TransactionEvent::Timer(t)) {
304                        Ok(_) => {}
305                        Err(error::SendError(t)) => match t {
306                            TransactionEvent::Timer(t) => {
307                                self.detach_transaction(t.key(), None);
308                            }
309                            _ => {}
310                        },
311                    }
312                }
313            }
314            ticker.tick().await;
315        }
316    }
317
318    // receive message from transport layer
319    pub async fn on_received_message(
320        self: &Arc<Self>,
321        msg: SipMessage,
322        connection: SipConnection,
323    ) -> Result<()> {
324        let mut key = match &msg {
325            SipMessage::Request(req) => {
326                TransactionKey::from_request(req, super::key::TransactionRole::Server)?
327            }
328            SipMessage::Response(resp) => {
329                TransactionKey::from_response(resp, super::key::TransactionRole::Client)?
330            }
331        };
332        match &msg {
333            SipMessage::Request(req) => {
334                match req.method() {
335                    rsip::Method::Ack => match DialogId::try_from(req) {
336                        Ok(dialog_id) => {
337                            let tx_key = self
338                                .waiting_ack
339                                .read()
340                                .map(|wa| wa.get(&dialog_id).cloned());
341                            if let Ok(Some(tx_key)) = tx_key {
342                                key = tx_key;
343                            }
344                        }
345                        Err(_) => {}
346                    },
347                    _ => {}
348                }
349                // check is the termination of an existing transaction
350                let last_message = self
351                    .finished_transactions
352                    .read()
353                    .unwrap()
354                    .get(&key)
355                    .cloned()
356                    .flatten();
357
358                if let Some(last_message) = last_message {
359                    connection.send(last_message, None).await?;
360                    return Ok(());
361                }
362            }
363            SipMessage::Response(resp) => {
364                let last_message = self
365                    .finished_transactions
366                    .read()
367                    .unwrap()
368                    .get(&key)
369                    .cloned()
370                    .flatten();
371
372                if let Some(mut last_message) = last_message {
373                    match last_message {
374                        SipMessage::Request(ref mut last_req) => {
375                            if last_req.method() == &rsip::Method::Ack {
376                                match resp.status_code.kind() {
377                                    rsip::StatusCodeKind::Provisional => {
378                                        return Ok(());
379                                    }
380                                    rsip::StatusCodeKind::Successful => {
381                                        if last_req.to_header()?.tag().ok().is_none() {
382                                            // don't ack 2xx response when ack is placeholder
383                                            return Ok(());
384                                        }
385                                    }
386                                    _ => {}
387                                }
388                                if let Ok(Some(tag)) = resp.to_header()?.tag() {
389                                    last_req.to_header_mut().and_then(|h| h.mut_tag(tag)).ok();
390                                }
391                            }
392                        }
393                        _ => {}
394                    }
395                    connection.send(last_message, None).await?;
396                    return Ok(());
397                }
398            }
399        };
400
401        let msg = if let Some(inspector) = &self.inspector {
402            inspector.after_received(msg)
403        } else {
404            msg
405        };
406
407        if let Some(tu) = self.transactions.read().unwrap().get(&key) {
408            tu.send(TransactionEvent::Received(msg, Some(connection)))
409                .map_err(|e| Error::TransactionError(e.to_string(), key))?;
410            return Ok(());
411        }
412        // if the transaction is not exist, create a new transaction
413        let request = match msg {
414            SipMessage::Request(req) => req,
415            SipMessage::Response(resp) => {
416                if resp.cseq_header()?.method()? != rsip::Method::Cancel {
417                    debug!(%key, "the transaction is not exist {}", resp);
418                }
419                return Ok(());
420            }
421        };
422
423        match request.method {
424            rsip::Method::Cancel => {
425                let resp = self.make_response(
426                    &request,
427                    rsip::StatusCode::CallTransactionDoesNotExist,
428                    None,
429                );
430                let resp = if let Some(ref inspector) = self.inspector {
431                    inspector.before_send(resp.into())
432                } else {
433                    resp.into()
434                };
435                connection.send(resp, None).await?;
436                return Ok(());
437            }
438            rsip::Method::Ack => return Ok(()),
439            _ => {}
440        }
441
442        let tx =
443            Transaction::new_server(key.clone(), request.clone(), self.clone(), Some(connection));
444
445        self.incoming_sender.send(tx).ok();
446        Ok(())
447    }
448
449    pub fn attach_transaction(&self, key: &TransactionKey, tu_sender: TransactionEventSender) {
450        trace!(%key, "attach transaction");
451        self.transactions
452            .write()
453            .as_mut()
454            .map(|ts| ts.insert(key.clone(), tu_sender))
455            .ok();
456    }
457
458    pub fn detach_transaction(&self, key: &TransactionKey, last_message: Option<SipMessage>) {
459        trace!(%key, "detach transaction");
460        self.transactions
461            .write()
462            .as_mut()
463            .map(|ts| ts.remove(key))
464            .ok();
465
466        if let Some(msg) = last_message {
467            self.timers.timeout(
468                self.option.t1x64,
469                TransactionTimer::TimerCleanup(key.clone()), // maybe use TimerK ???
470            );
471
472            self.finished_transactions
473                .write()
474                .as_mut()
475                .map(|ft| ft.insert(key.clone(), Some(msg)))
476                .ok();
477        }
478    }
479
480    pub fn get_addrs(&self) -> Vec<SipAddr> {
481        self.transport_layer.get_addrs()
482    }
483
484    pub fn get_record_route(&self) -> Result<rsip::typed::RecordRoute> {
485        let first_addr = self
486            .transport_layer
487            .get_addrs()
488            .first()
489            .ok_or(Error::EndpointError("not sipaddrs".to_string()))
490            .cloned()?;
491        let rr = rsip::UriWithParamsList(vec![rsip::UriWithParams {
492            uri: first_addr.into(),
493            params: vec![rsip::Param::Other("lr".into(), None)],
494        }]);
495        Ok(rr.into())
496    }
497
498    pub fn get_via(
499        &self,
500        addr: Option<crate::transport::SipAddr>,
501        branch: Option<rsip::Param>,
502    ) -> Result<rsip::typed::Via> {
503        let first_addr = match addr {
504            Some(addr) => addr,
505            None => self
506                .transport_layer
507                .get_addrs()
508                .first()
509                .ok_or(Error::EndpointError("not sipaddrs".to_string()))
510                .cloned()?,
511        };
512
513        let via = rsip::typed::Via {
514            version: rsip::Version::V2,
515            transport: first_addr.r#type.unwrap_or_default(),
516            uri: first_addr.addr.into(),
517            params: vec![
518                branch.unwrap_or_else(make_via_branch),
519                rsip::Param::Other("rport".into(), None),
520            ],
521        };
522        Ok(via)
523    }
524
525    pub fn get_stats(&self) -> EndpointStats {
526        let waiting_ack = self
527            .waiting_ack
528            .read()
529            .map(|wa| wa.len())
530            .unwrap_or_default();
531        let running_transactions = self
532            .transactions
533            .read()
534            .map(|ts| ts.len())
535            .unwrap_or_default();
536        let finished_transactions = self
537            .finished_transactions
538            .read()
539            .map(|ft| ft.len())
540            .unwrap_or_default();
541
542        EndpointStats {
543            running_transactions,
544            finished_transactions,
545            waiting_ack,
546        }
547    }
548}
549
550impl EndpointBuilder {
551    pub fn new() -> Self {
552        EndpointBuilder {
553            allows: Vec::new(),
554            user_agent: VERSION.to_string(),
555            transport_layer: None,
556            cancel_token: None,
557            timer_interval: None,
558            option: None,
559            inspector: None,
560        }
561    }
562    pub fn with_option(&mut self, option: EndpointOption) -> &mut Self {
563        self.option = Some(option);
564        self
565    }
566    pub fn with_user_agent(&mut self, user_agent: &str) -> &mut Self {
567        self.user_agent = user_agent.to_string();
568        self
569    }
570
571    pub fn with_transport_layer(&mut self, transport_layer: TransportLayer) -> &mut Self {
572        self.transport_layer.replace(transport_layer);
573        self
574    }
575
576    pub fn with_cancel_token(&mut self, cancel_token: CancellationToken) -> &mut Self {
577        self.cancel_token.replace(cancel_token);
578        self
579    }
580
581    pub fn with_timer_interval(&mut self, timer_interval: Duration) -> &mut Self {
582        self.timer_interval.replace(timer_interval);
583        self
584    }
585    pub fn with_allows(&mut self, allows: Vec<rsip::Method>) -> &mut Self {
586        self.allows = allows;
587        self
588    }
589    pub fn with_inspector(&mut self, inspector: Box<dyn MessageInspector>) -> &mut Self {
590        self.inspector = Some(inspector);
591        self
592    }
593    pub fn follow_record_route(&mut self, enabled: bool) -> &mut Self {
594        self.option
595            .get_or_insert_with(EndpointOption::default)
596            .follow_record_route = enabled;
597        self
598    }
599    pub fn build(&mut self) -> Endpoint {
600        let cancel_token = self.cancel_token.take().unwrap_or_default();
601
602        let transport_layer = self
603            .transport_layer
604            .take()
605            .unwrap_or(TransportLayer::new(cancel_token.child_token()));
606
607        let allows = self.allows.to_owned();
608        let user_agent = self.user_agent.to_owned();
609        let timer_interval = self.timer_interval.to_owned();
610        let option = self.option.take();
611        let inspector = self.inspector.take();
612
613        let core = EndpointInner::new(
614            user_agent,
615            transport_layer,
616            cancel_token,
617            timer_interval,
618            allows,
619            option,
620            inspector,
621        );
622
623        Endpoint { inner: core }
624    }
625}
626
627impl Endpoint {
628    pub async fn serve(&self) {
629        let inner = self.inner.clone();
630        match inner.serve().await {
631            Ok(()) => {
632                info!("endpoint shutdown");
633            }
634            Err(e) => {
635                warn!("endpoint serve error: {:?}", e);
636            }
637        }
638    }
639
640    pub fn shutdown(&self) {
641        info!("endpoint shutdown requested");
642        self.inner.cancel_token.cancel();
643    }
644
645    //
646    // get incoming requests from the endpoint
647    // don't call repeat!
648    pub fn incoming_transactions(&self) -> Result<TransactionReceiver> {
649        self.inner
650            .incoming_receiver
651            .lock()
652            .unwrap()
653            .take()
654            .ok_or_else(|| Error::EndpointError("incoming recevier taken".to_string()))
655    }
656
657    pub fn get_addrs(&self) -> Vec<SipAddr> {
658        self.inner.transport_layer.get_addrs()
659    }
660}