1use super::{
2 key::TransactionKey,
3 make_via_branch,
4 timer::Timer,
5 transaction::{Transaction, TransactionEvent, TransactionEventSender},
6 SipConnection, TransactionReceiver, TransactionSender, TransactionTimer,
7};
8use crate::{
9 dialog::DialogId,
10 rsip_ext::destination_from_request,
11 transport::{transport_layer::DomainResolver, SipAddr, TransportEvent, TransportLayer},
12 Error, Result, VERSION,
13};
14use async_trait::async_trait;
15use rsip::{prelude::HeadersExt, SipMessage};
16use std::{
17 collections::HashMap,
18 sync::{Arc, Mutex, RwLock},
19 time::Duration,
20};
21use tokio::{
22 select,
23 sync::mpsc::{error, unbounded_channel},
24};
25use tokio_util::sync::CancellationToken;
26use tracing::{debug, info, trace, warn};
27
28pub trait MessageInspector: Send + Sync {
29 fn before_send(&self, msg: SipMessage, dest: Option<&SipAddr>) -> SipMessage;
30 fn after_received(&self, msg: SipMessage, from: &SipAddr) -> SipMessage;
31}
32
33#[async_trait]
34pub trait TargetLocator: Send + Sync {
35 async fn locate(&self, uri: &rsip::Uri) -> Result<SipAddr>;
36}
37
38#[async_trait]
39pub trait TransportEventInspector: Send + Sync {
40 async fn handle(&self, event: TransportEvent) -> Option<TransportEvent>;
41}
42
43pub struct EndpointOption {
44 pub t1: Duration,
45 pub t4: Duration,
46 pub t1x64: Duration,
47 pub timerc: Duration,
48 pub callid_suffix: Option<String>,
49}
50
51impl Default for EndpointOption {
52 fn default() -> Self {
53 EndpointOption {
54 t1: Duration::from_millis(500),
55 t4: Duration::from_secs(5),
56 t1x64: Duration::from_millis(64 * 500),
57 timerc: Duration::from_secs(180),
58 callid_suffix: None,
59 }
60 }
61}
62
63pub struct EndpointStats {
64 pub running_transactions: usize,
65 pub finished_transactions: usize,
66 pub waiting_ack: usize,
67}
68
69pub struct EndpointInner {
102 pub allows: Mutex<Option<Vec<rsip::Method>>>,
103 pub user_agent: String,
104 pub timers: Timer<TransactionTimer>,
105 pub transport_layer: TransportLayer,
106 pub finished_transactions: RwLock<HashMap<TransactionKey, Option<SipMessage>>>,
107 pub transactions: RwLock<HashMap<TransactionKey, TransactionEventSender>>,
108 pub waiting_ack: RwLock<HashMap<DialogId, TransactionKey>>,
109 incoming_sender: TransactionSender,
110 incoming_receiver: Mutex<Option<TransactionReceiver>>,
111 cancel_token: CancellationToken,
112 #[allow(dead_code)]
113 timer_interval: Duration,
114 pub(super) message_inspector: Option<Box<dyn MessageInspector>>,
115 pub(super) locator: Option<Box<dyn TargetLocator>>,
116 pub(super) transport_inspector: Option<Box<dyn TransportEventInspector>>,
117 pub option: EndpointOption,
118}
119pub type EndpointInnerRef = Arc<EndpointInner>;
120
121pub struct EndpointBuilder {
140 allows: Vec<rsip::Method>,
141 user_agent: String,
142 transport_layer: Option<TransportLayer>,
143 cancel_token: Option<CancellationToken>,
144 timer_interval: Option<Duration>,
145 option: Option<EndpointOption>,
146 message_inspector: Option<Box<dyn MessageInspector>>,
147 target_locator: Option<Box<dyn TargetLocator>>,
148 transport_inspector: Option<Box<dyn TransportEventInspector>>,
149 domain_resolver: Option<Box<dyn DomainResolver>>,
150}
151
152pub struct Endpoint {
204 pub inner: EndpointInnerRef,
205}
206
207impl EndpointInner {
208 pub fn new(
209 user_agent: String,
210 transport_layer: TransportLayer,
211 cancel_token: CancellationToken,
212 timer_interval: Option<Duration>,
213 allows: Vec<rsip::Method>,
214 option: Option<EndpointOption>,
215 message_inspector: Option<Box<dyn MessageInspector>>,
216 locator: Option<Box<dyn TargetLocator>>,
217 transport_inspector: Option<Box<dyn TransportEventInspector>>,
218 ) -> Arc<Self> {
219 let (incoming_sender, incoming_receiver) = unbounded_channel();
220 Arc::new(EndpointInner {
221 allows: Mutex::new(Some(allows)),
222 user_agent,
223 timers: Timer::new(),
224 transport_layer,
225 transactions: RwLock::new(HashMap::new()),
226 finished_transactions: RwLock::new(HashMap::new()),
227 waiting_ack: RwLock::new(HashMap::new()),
228 timer_interval: timer_interval.unwrap_or(Duration::from_millis(20)),
229 cancel_token,
230 incoming_sender,
231 incoming_receiver: Mutex::new(Some(incoming_receiver)),
232 option: option.unwrap_or_default(),
233 message_inspector,
234 locator,
235 transport_inspector,
236 })
237 }
238
239 pub async fn serve(self: &Arc<Self>) -> Result<()> {
240 select! {
241 _ = self.cancel_token.cancelled() => {},
242 _ = self.process_timer() => {},
243 r = self.clone().process_transport_layer() => {
244 _ = r?;
245 },
246 }
247 Ok(())
248 }
249
250 async fn process_transport_layer(self: Arc<Self>) -> Result<()> {
252 self.transport_layer.serve_listens().await.ok();
253
254 let mut transport_rx = match self
255 .transport_layer
256 .inner
257 .transport_rx
258 .lock()
259 .unwrap()
260 .take()
261 {
262 Some(rx) => rx,
263 None => {
264 return Err(Error::EndpointError("transport_rx not set".to_string()));
265 }
266 };
267
268 while let Some(mut event) = transport_rx.recv().await {
269 if let Some(transport_inspector) = &self.transport_inspector {
270 match transport_inspector.handle(event).await {
271 Some(e) => {
272 event = e;
273 }
274 None => {
275 continue;
276 }
277 }
278 }
279
280 match event {
281 TransportEvent::Incoming(msg, connection, from) => {
282 match self.on_received_message(msg, connection, &from).await {
283 Ok(()) => {}
284 Err(e) => {
285 warn!(addr = %from, error = %e, "on_received_message error");
286 }
287 }
288 }
289 TransportEvent::New(t) => {
290 debug!(addr=%t.get_addr(), "new connection");
291 }
292 TransportEvent::Closed(t) => {
293 debug!(addr=%t.get_addr(), "closed connection");
294 }
295 }
296 }
297 Ok(())
298 }
299
300 pub async fn process_timer(&self) {
301 loop {
302 for t in self.timers.wait_for_ready().await.into_iter() {
303 match t {
304 TransactionTimer::TimerCleanup(key) => {
305 trace!(%key, "TimerCleanup");
306 self.transactions
307 .write()
308 .as_mut()
309 .map(|ts| ts.remove(&key))
310 .ok();
311 self.finished_transactions
312 .write()
313 .as_mut()
314 .map(|t| t.remove(&key))
315 .ok();
316 continue;
317 }
318 _ => {}
319 }
320
321 if let Ok(Some(tu)) =
322 { self.transactions.read().as_ref().map(|ts| ts.get(&t.key())) }
323 {
324 match tu.send(TransactionEvent::Timer(t)) {
325 Ok(_) => {}
326 Err(error::SendError(t)) => match t {
327 TransactionEvent::Timer(t) => {
328 self.detach_transaction(t.key(), None);
329 }
330 _ => {}
331 },
332 }
333 }
334 }
335 }
336 }
337
338 pub async fn get_destination_from_request(&self, req: &rsip::Request) -> Option<SipAddr> {
340 let (transport, host_with_port) =
341 SipConnection::parse_target_from_via(req.via_header().ok()?).ok()?;
342
343 let sip_addr = SipAddr {
344 r#type: Some(transport),
345 addr: host_with_port,
346 };
347
348 if matches!(sip_addr.addr.host, rsip::Host::Domain(_)) {
349 return self
350 .transport_layer
351 .inner
352 .domain_resolver
353 .resolve(&sip_addr)
354 .await
355 .ok();
356 }
357 Some(sip_addr)
358 }
359
360 pub async fn on_received_message(
362 self: &Arc<Self>,
363 msg: SipMessage,
364 connection: SipConnection,
365 from: &SipAddr,
366 ) -> Result<()> {
367 let mut key = match &msg {
368 SipMessage::Request(req) => {
369 TransactionKey::from_request(req, super::key::TransactionRole::Server)?
370 }
371 SipMessage::Response(resp) => {
372 TransactionKey::from_response(resp, super::key::TransactionRole::Client)?
373 }
374 };
375 match &msg {
376 SipMessage::Request(req) => {
377 match req.method() {
378 rsip::Method::Ack => {
379 match DialogId::try_from((req, super::key::TransactionRole::Server)) {
380 Ok(dialog_id) => {
381 let tx_key = self
382 .waiting_ack
383 .read()
384 .map(|wa| wa.get(&dialog_id).cloned());
385 if let Ok(Some(tx_key)) = tx_key {
386 key = tx_key;
387 }
388 }
389 Err(_) => {}
390 }
391 }
392 _ => {}
393 }
394 let last_message = self
396 .finished_transactions
397 .read()
398 .unwrap()
399 .get(&key)
400 .cloned()
401 .flatten();
402
403 if let Some(last_message) = last_message {
404 let dest = if !connection.is_reliable() {
405 self.get_destination_from_request(req).await
406 } else {
407 None
408 };
409 connection.send(last_message, dest.as_ref()).await?;
410 return Ok(());
411 }
412 }
413 SipMessage::Response(resp) => {
414 let last_message = self
415 .finished_transactions
416 .read()
417 .unwrap()
418 .get(&key)
419 .cloned()
420 .flatten();
421
422 if let Some(mut last_message) = last_message {
423 match last_message {
424 SipMessage::Request(ref mut last_req) => {
425 if last_req.method() == &rsip::Method::Ack {
426 match resp.status_code.kind() {
427 rsip::StatusCodeKind::Provisional => {
428 return Ok(());
429 }
430 rsip::StatusCodeKind::Successful => {
431 if last_req.to_header()?.tag().ok().is_none() {
432 return Ok(());
434 }
435 }
436 _ => {}
437 }
438
439 if let Ok(Some(tag)) = resp.to_header().and_then(|h| h.tag()) {
440 last_req.to_header_mut().and_then(|h| h.mut_tag(tag)).ok();
441 }
442
443 if let rsip::StatusCodeKind::RequestFailure =
444 resp.status_code.kind()
445 {
446 connection.send(last_message, Some(from)).await?;
448 return Ok(());
449 }
450
451 let dest = match destination_from_request(&last_req)
452 .and_then(|uri| SipAddr::try_from(uri).ok())
453 {
454 Some(addr)
455 if matches!(addr.addr.host, rsip::Host::Domain(_)) =>
456 {
457 self.transport_layer
458 .inner
459 .domain_resolver
460 .resolve(&addr)
461 .await
462 .ok()
463 }
464 addr => addr,
465 };
466
467 connection.send(last_message, dest.as_ref()).await?;
468 }
469 }
470 _ => {}
471 }
472 return Ok(());
473 }
474 }
475 };
476
477 let msg = if let Some(inspector) = &self.message_inspector {
478 inspector.after_received(msg, from)
479 } else {
480 msg
481 };
482
483 if let Some(tu) = self.transactions.read().unwrap().get(&key) {
484 tu.send(TransactionEvent::Received(msg, Some(connection)))
485 .map_err(|e| Error::TransactionError(e.to_string(), key))?;
486 return Ok(());
487 }
488 let request = match msg {
490 SipMessage::Request(req) => req,
491 SipMessage::Response(resp) => {
492 if resp.cseq_header()?.method()? != rsip::Method::Cancel {
493 debug!(%key, response = %resp, "the transaction does not exist");
494 }
495 return Ok(());
496 }
497 };
498
499 match request.method {
500 rsip::Method::Cancel => {
501 let resp = self.make_response(
502 &request,
503 rsip::StatusCode::CallTransactionDoesNotExist,
504 None,
505 );
506 let resp = if let Some(ref inspector) = self.message_inspector {
507 inspector.before_send(resp.into(), None)
508 } else {
509 resp.into()
510 };
511
512 let dest = if !connection.is_reliable() {
513 self.get_destination_from_request(&request).await
514 } else {
515 None
516 };
517 connection.send(resp, dest.as_ref()).await?;
518 return Ok(());
519 }
520 rsip::Method::Ack => return Ok(()),
521 _ => {}
522 }
523
524 let tx =
525 Transaction::new_server(key.clone(), request.clone(), self.clone(), Some(connection));
526
527 self.incoming_sender.send(tx).ok();
528 Ok(())
529 }
530
531 pub fn attach_transaction(&self, key: &TransactionKey, tu_sender: TransactionEventSender) {
532 trace!(%key, "attach transaction");
533 self.transactions
534 .write()
535 .as_mut()
536 .map(|ts| ts.insert(key.clone(), tu_sender))
537 .ok();
538 }
539
540 pub fn detach_transaction(&self, key: &TransactionKey, last_message: Option<SipMessage>) {
541 trace!(%key, "detach transaction");
542 self.transactions
543 .write()
544 .as_mut()
545 .map(|ts| ts.remove(key))
546 .ok();
547
548 if let Some(msg) = last_message {
549 self.timers.timeout(
550 self.option.t1x64,
551 TransactionTimer::TimerCleanup(key.clone()), );
553
554 self.finished_transactions
555 .write()
556 .as_mut()
557 .map(|ft| ft.insert(key.clone(), Some(msg)))
558 .ok();
559 }
560 }
561
562 pub fn get_addrs(&self) -> Vec<SipAddr> {
563 self.transport_layer.get_addrs()
564 }
565
566 pub fn get_record_route(&self) -> Result<rsip::typed::RecordRoute> {
567 let first_addr = self
568 .transport_layer
569 .get_addrs()
570 .first()
571 .ok_or(Error::EndpointError("not sipaddrs".to_string()))
572 .cloned()?;
573 let rr = rsip::UriWithParamsList(vec![rsip::UriWithParams {
574 uri: first_addr.into(),
575 params: vec![rsip::Param::Other("lr".into(), None)],
576 }]);
577 Ok(rr.into())
578 }
579
580 pub fn get_via(
581 &self,
582 addr: Option<crate::transport::SipAddr>,
583 branch: Option<rsip::Param>,
584 ) -> Result<rsip::typed::Via> {
585 let first_addr = match addr {
586 Some(addr) => addr,
587 None => self
588 .transport_layer
589 .get_addrs()
590 .first()
591 .ok_or(Error::EndpointError("not sipaddrs".to_string()))
592 .cloned()?,
593 };
594
595 let via = rsip::typed::Via {
596 version: rsip::Version::V2,
597 transport: first_addr.r#type.unwrap_or_default(),
598 uri: first_addr.addr.into(),
599 params: vec![
600 branch.unwrap_or_else(make_via_branch),
601 rsip::Param::Other("rport".into(), None),
602 ],
603 };
604 Ok(via)
605 }
606
607 pub fn get_running_transactions(&self) -> Option<Vec<TransactionKey>> {
608 self.transactions
609 .read()
610 .map(|ts| ts.keys().cloned().collect())
611 .ok()
612 }
613
614 pub fn get_stats(&self) -> EndpointStats {
615 let waiting_ack = self
616 .waiting_ack
617 .read()
618 .map(|wa| wa.len())
619 .unwrap_or_default();
620 let running_transactions = self
621 .transactions
622 .read()
623 .map(|ts| ts.len())
624 .unwrap_or_default();
625 let finished_transactions = self
626 .finished_transactions
627 .read()
628 .map(|ft| ft.len())
629 .unwrap_or_default();
630
631 EndpointStats {
632 running_transactions,
633 finished_transactions,
634 waiting_ack,
635 }
636 }
637}
638
639impl EndpointBuilder {
640 pub fn new() -> Self {
641 EndpointBuilder {
642 allows: Vec::new(),
643 user_agent: VERSION.to_string(),
644 transport_layer: None,
645 cancel_token: None,
646 timer_interval: None,
647 option: None,
648 message_inspector: None,
649 target_locator: None,
650 transport_inspector: None,
651 domain_resolver: None,
652 }
653 }
654
655 pub fn with_option(&mut self, option: EndpointOption) -> &mut Self {
656 self.option = Some(option);
657 self
658 }
659
660 pub fn with_user_agent(&mut self, user_agent: &str) -> &mut Self {
661 self.user_agent = user_agent.to_string();
662 self
663 }
664
665 pub fn with_transport_layer(&mut self, transport_layer: TransportLayer) -> &mut Self {
666 self.transport_layer.replace(transport_layer);
667 self
668 }
669
670 pub fn with_cancel_token(&mut self, cancel_token: CancellationToken) -> &mut Self {
671 self.cancel_token.replace(cancel_token);
672 self
673 }
674
675 pub fn with_timer_interval(&mut self, timer_interval: Duration) -> &mut Self {
676 self.timer_interval.replace(timer_interval);
677 self
678 }
679 pub fn with_allows(&mut self, allows: Vec<rsip::Method>) -> &mut Self {
680 self.allows = allows;
681 self
682 }
683 pub fn with_inspector(&mut self, inspector: Box<dyn MessageInspector>) -> &mut Self {
684 self.message_inspector = Some(inspector);
685 self
686 }
687 pub fn with_target_locator(&mut self, locator: Box<dyn TargetLocator>) -> &mut Self {
688 self.target_locator = Some(locator);
689 self
690 }
691
692 pub fn with_transport_inspector(
693 &mut self,
694 inspector: Box<dyn TransportEventInspector>,
695 ) -> &mut Self {
696 self.transport_inspector = Some(inspector);
697 self
698 }
699
700 pub fn with_domain_resolver(&mut self, resolver: Box<dyn DomainResolver>) -> &mut Self {
701 self.domain_resolver = Some(resolver);
702 self
703 }
704
705 pub fn build(&mut self) -> Endpoint {
706 let cancel_token = self.cancel_token.take().unwrap_or_default();
707 let transport_layer = self.transport_layer.take().unwrap_or_else(|| {
708 if let Some(resolver) = self.domain_resolver.take() {
709 TransportLayer::new_with_domain_resolver(cancel_token.clone(), resolver)
710 } else {
711 TransportLayer::new(cancel_token.clone())
712 }
713 });
714
715 let allows = self.allows.to_owned();
716 let user_agent = self.user_agent.to_owned();
717 let timer_interval = self.timer_interval.to_owned();
718 let option = self.option.take();
719 let message_inspector = self.message_inspector.take();
720 let locator = self.target_locator.take();
721 let transport_inspector = self.transport_inspector.take();
722
723 let core = EndpointInner::new(
724 user_agent,
725 transport_layer,
726 cancel_token,
727 timer_interval,
728 allows,
729 option,
730 message_inspector,
731 locator,
732 transport_inspector,
733 );
734
735 Endpoint { inner: core }
736 }
737}
738
739impl Endpoint {
740 pub async fn serve(&self) {
741 let inner = self.inner.clone();
742 match inner.serve().await {
743 Ok(()) => {
744 info!("endpoint shutdown");
745 }
746 Err(e) => {
747 warn!(error = ?e, "endpoint serve error");
748 }
749 }
750 }
751
752 pub fn shutdown(&self) {
753 info!("endpoint shutdown requested");
754 self.inner.cancel_token.cancel();
755 }
756
757 pub fn incoming_transactions(&self) -> Result<TransactionReceiver> {
761 self.inner
762 .incoming_receiver
763 .lock()
764 .unwrap()
765 .take()
766 .ok_or_else(|| Error::EndpointError("incoming recevier taken".to_string()))
767 }
768
769 pub fn get_addrs(&self) -> Vec<SipAddr> {
770 self.inner.transport_layer.get_addrs()
771 }
772}