1use super::{
2 key::TransactionKey,
3 make_via_branch,
4 timer::Timer,
5 transaction::{Transaction, TransactionEvent, TransactionEventSender},
6 SipConnection, TransactionReceiver, TransactionSender, TransactionTimer,
7};
8use crate::rsip;
9use crate::{
10 dialog::DialogId,
11 transport::{SipAddr, TransportEvent, TransportLayer},
12 Error, Result, VERSION,
13};
14use rsip::{prelude::HeadersExt, SipMessage};
15use std::{
16 collections::HashMap,
17 sync::{Arc, Mutex, RwLock},
18 time::{Duration, Instant},
19};
20use tokio::{
21 select,
22 sync::mpsc::{error, unbounded_channel},
23};
24use tokio_util::sync::CancellationToken;
25use tracing::{debug, info, trace, warn};
26
27pub trait MessageInspector: Send + Sync {
28 fn before_send(&self, msg: SipMessage) -> SipMessage;
29 fn after_received(&self, msg: SipMessage) -> SipMessage;
30}
31
32#[non_exhaustive]
33pub struct EndpointOption {
34 pub t1: Duration,
35 pub t4: Duration,
36 pub t1x64: Duration,
37 pub timerc: Duration,
38 pub ignore_out_of_dialog_option: bool,
39 pub callid_suffix: Option<String>,
40 pub dialog_keepalive_duration: Option<Duration>,
41 pub follow_record_route: bool,
42}
43
44impl Default for EndpointOption {
45 fn default() -> Self {
46 EndpointOption {
47 t1: Duration::from_millis(500),
48 t4: Duration::from_secs(4),
49 t1x64: Duration::from_millis(64 * 500),
50 timerc: Duration::from_secs(180),
51 ignore_out_of_dialog_option: true,
52 callid_suffix: None,
53 dialog_keepalive_duration: Some(Duration::from_secs(60)),
54 follow_record_route: true,
55 }
56 }
57}
58
59pub struct EndpointStats {
60 pub running_transactions: usize,
61 pub finished_transactions: usize,
62 pub waiting_ack: usize,
63}
64
65pub struct EndpointInner {
98 pub allows: Mutex<Option<Vec<rsip::Method>>>,
99 pub user_agent: String,
100 pub timers: Timer<TransactionTimer>,
101 pub transport_layer: TransportLayer,
102 pub finished_transactions: RwLock<HashMap<TransactionKey, Option<SipMessage>>>,
103 pub transactions: RwLock<HashMap<TransactionKey, TransactionEventSender>>,
104 pub waiting_ack: RwLock<HashMap<DialogId, TransactionKey>>,
105 incoming_sender: TransactionSender,
106 incoming_receiver: Mutex<Option<TransactionReceiver>>,
107 cancel_token: CancellationToken,
108 timer_interval: Duration,
109 pub(super) inspector: Option<Box<dyn MessageInspector>>,
110 pub option: EndpointOption,
111}
112pub type EndpointInnerRef = Arc<EndpointInner>;
113
114pub struct EndpointBuilder {
133 allows: Vec<rsip::Method>,
134 user_agent: String,
135 transport_layer: Option<TransportLayer>,
136 cancel_token: Option<CancellationToken>,
137 timer_interval: Option<Duration>,
138 option: Option<EndpointOption>,
139 inspector: Option<Box<dyn MessageInspector>>,
140}
141
142pub struct Endpoint {
194 pub inner: EndpointInnerRef,
195}
196
197impl EndpointInner {
198 pub fn new(
199 user_agent: String,
200 transport_layer: TransportLayer,
201 cancel_token: CancellationToken,
202 timer_interval: Option<Duration>,
203 allows: Vec<rsip::Method>,
204 option: Option<EndpointOption>,
205 inspector: Option<Box<dyn MessageInspector>>,
206 ) -> Arc<Self> {
207 let (incoming_sender, incoming_receiver) = unbounded_channel();
208 Arc::new(EndpointInner {
209 allows: Mutex::new(Some(allows)),
210 user_agent,
211 timers: Timer::new(),
212 transport_layer,
213 transactions: RwLock::new(HashMap::new()),
214 finished_transactions: RwLock::new(HashMap::new()),
215 waiting_ack: RwLock::new(HashMap::new()),
216 timer_interval: timer_interval.unwrap_or(Duration::from_millis(20)),
217 cancel_token,
218 incoming_sender,
219 incoming_receiver: Mutex::new(Some(incoming_receiver)),
220 option: option.unwrap_or_default(),
221 inspector,
222 })
223 }
224
225 pub async fn serve(self: &Arc<Self>) -> Result<()> {
226 select! {
227 _ = self.cancel_token.cancelled() => {
228 },
229 r = self.process_timer() => {
230 _ = r?
231 },
232 r = self.clone().process_transport_layer() => {
233 _ = r?
234 },
235 }
236 Ok(())
237 }
238
239 async fn process_transport_layer(self: Arc<Self>) -> Result<()> {
241 self.transport_layer.serve_listens().await.ok();
242
243 let mut transport_rx = match self
244 .transport_layer
245 .inner
246 .transport_rx
247 .lock()
248 .unwrap()
249 .take()
250 {
251 Some(rx) => rx,
252 None => {
253 return Err(Error::EndpointError("transport_rx not set".to_string()));
254 }
255 };
256
257 while let Some(event) = transport_rx.recv().await {
258 trace!(?event, "endpoint received transport event");
259 match event {
260 TransportEvent::Incoming(msg, connection, from) => {
261 match self.on_received_message(msg, connection).await {
262 Ok(()) => {
263 trace!(addr = %from, "on_received_message completed");
264 }
265 Err(e) => {
266 warn!(addr = %from, "on_received_message error: {}", e);
267 }
268 }
269 }
270 TransportEvent::New(t) => {
271 info!(addr=%t.get_addr(), "new connection");
272 }
273 TransportEvent::Closed(t) => {
274 info!(addr=%t.get_addr(), "closed connection");
275 }
276 }
277 }
278
279 warn!("transport_rx closed");
280 Ok(())
281 }
282
283 pub async fn process_timer(&self) -> Result<()> {
284 let mut ticker = tokio::time::interval(self.timer_interval);
285 loop {
286 for t in self.timers.poll(Instant::now()) {
287 match t {
288 TransactionTimer::TimerCleanup(key) => {
289 trace!(%key, "TimerCleanup");
290 self.transactions
291 .write()
292 .as_mut()
293 .map(|ts| ts.remove(&key))
294 .ok();
295 self.finished_transactions
296 .write()
297 .as_mut()
298 .map(|t| t.remove(&key))
299 .ok();
300 continue;
301 }
302 _ => {}
303 }
304
305 if let Ok(Some(tu)) =
306 { self.transactions.read().as_ref().map(|ts| ts.get(&t.key())) }
307 {
308 match tu.send(TransactionEvent::Timer(t)) {
309 Ok(_) => {}
310 Err(error::SendError(t)) => match t {
311 TransactionEvent::Timer(t) => {
312 self.detach_transaction(t.key(), None);
313 }
314 _ => {}
315 },
316 }
317 }
318 }
319 ticker.tick().await;
320 }
321 }
322
323 pub async fn on_received_message(
325 self: &Arc<Self>,
326 msg: SipMessage,
327 connection: SipConnection,
328 ) -> Result<()> {
329 let mut key = match &msg {
330 SipMessage::Request(req) => {
331 TransactionKey::from_request(req, super::key::TransactionRole::Server)?
332 }
333 SipMessage::Response(resp) => {
334 TransactionKey::from_response(resp, super::key::TransactionRole::Client)?
335 }
336 };
337 match &msg {
338 SipMessage::Request(req) => {
339 match req.method() {
340 rsip::Method::Ack => match DialogId::try_from(req) {
341 Ok(dialog_id) => {
342 let tx_key = self
343 .waiting_ack
344 .read()
345 .map(|wa| wa.get(&dialog_id).cloned());
346 if let Ok(Some(tx_key)) = tx_key {
347 key = tx_key;
348 }
349 }
350 Err(_) => {}
351 },
352 _ => {}
353 }
354 let last_message = self
356 .finished_transactions
357 .read()
358 .unwrap()
359 .get(&key)
360 .cloned()
361 .flatten();
362
363 if let Some(last_message) = last_message {
364 connection.send(last_message, None).await?;
365 return Ok(());
366 }
367 }
368 SipMessage::Response(resp) => {
369 let last_message = self
370 .finished_transactions
371 .read()
372 .unwrap()
373 .get(&key)
374 .cloned()
375 .flatten();
376
377 if let Some(mut last_message) = last_message {
378 match last_message {
379 SipMessage::Request(ref mut last_req) => {
380 if last_req.method() == &rsip::Method::Ack {
381 match resp.status_code.kind() {
382 rsip::StatusCodeKind::Provisional => {
383 return Ok(());
384 }
385 rsip::StatusCodeKind::Successful => {
386 if last_req.to_header()?.tag().ok().is_none() {
387 return Ok(());
389 }
390 }
391 _ => {}
392 }
393 if let Ok(Some(tag)) = resp.to_header()?.tag() {
394 last_req.to_header_mut().and_then(|h| h.mut_tag(tag)).ok();
395 }
396 }
397 }
398 _ => {}
399 }
400 connection.send(last_message, None).await?;
401 return Ok(());
402 }
403 }
404 };
405
406 let msg = if let Some(inspector) = &self.inspector {
407 inspector.after_received(msg)
408 } else {
409 msg
410 };
411
412 if let Some(tu) = self.transactions.read().unwrap().get(&key) {
413 tu.send(TransactionEvent::Received(msg, Some(connection)))
414 .map_err(|e| Error::TransactionError(e.to_string(), key))?;
415 return Ok(());
416 }
417 let request = match msg {
419 SipMessage::Request(req) => req,
420 SipMessage::Response(resp) => {
421 if resp.cseq_header()?.method()? != rsip::Method::Cancel {
422 debug!(%key, "the transaction is not exist {}", resp);
423 }
424 return Ok(());
425 }
426 };
427
428 match request.method {
429 rsip::Method::Cancel => {
430 let resp = self.make_response(
431 &request,
432 rsip::StatusCode::CallTransactionDoesNotExist,
433 None,
434 );
435 let resp = if let Some(ref inspector) = self.inspector {
436 inspector.before_send(resp.into())
437 } else {
438 resp.into()
439 };
440 connection.send(resp, None).await?;
441 return Ok(());
442 }
443 rsip::Method::Ack => return Ok(()),
444 _ => {}
445 }
446
447 let tx =
448 Transaction::new_server(key.clone(), request.clone(), self.clone(), Some(connection));
449
450 self.incoming_sender.send(tx).ok();
451 Ok(())
452 }
453
454 pub fn attach_transaction(&self, key: &TransactionKey, tu_sender: TransactionEventSender) {
455 trace!(%key, "attach transaction");
456 self.transactions
457 .write()
458 .as_mut()
459 .map(|ts| ts.insert(key.clone(), tu_sender))
460 .ok();
461 }
462
463 pub fn detach_transaction(&self, key: &TransactionKey, last_message: Option<SipMessage>) {
464 trace!(%key, "detach transaction");
465 self.transactions
466 .write()
467 .as_mut()
468 .map(|ts| ts.remove(key))
469 .ok();
470
471 if let Some(msg) = last_message {
472 self.timers.timeout(
473 self.option.t1x64,
474 TransactionTimer::TimerCleanup(key.clone()), );
476
477 self.finished_transactions
478 .write()
479 .as_mut()
480 .map(|ft| ft.insert(key.clone(), Some(msg)))
481 .ok();
482 }
483 }
484
485 pub fn get_addrs(&self) -> Vec<SipAddr> {
486 self.transport_layer.get_addrs()
487 }
488
489 pub fn get_record_route(&self) -> Result<rsip::typed::RecordRoute> {
490 let first_addr = self
491 .transport_layer
492 .get_addrs()
493 .first()
494 .ok_or(Error::EndpointError("not sipaddrs".to_string()))
495 .cloned()?;
496 let rr = rsip::UriWithParamsList(vec![rsip::UriWithParams {
497 uri: first_addr.into(),
498 params: vec![rsip::Param::Other("lr".into(), None)],
499 }]);
500 Ok(rr.into())
501 }
502
503 pub fn get_via(
504 &self,
505 addr: Option<crate::transport::SipAddr>,
506 branch: Option<rsip::Param>,
507 ) -> Result<rsip::typed::Via> {
508 let first_addr = match addr {
509 Some(addr) => addr,
510 None => self
511 .transport_layer
512 .get_addrs()
513 .first()
514 .ok_or(Error::EndpointError("not sipaddrs".to_string()))
515 .cloned()?,
516 };
517
518 let via = rsip::typed::Via {
519 version: rsip::Version::V2,
520 transport: first_addr.r#type.unwrap_or_default(),
521 uri: first_addr.addr.into(),
522 params: vec![
523 branch.unwrap_or_else(make_via_branch),
524 rsip::Param::Other("rport".into(), None),
525 ],
526 };
527 Ok(via)
528 }
529
530 pub fn get_stats(&self) -> EndpointStats {
531 let waiting_ack = self
532 .waiting_ack
533 .read()
534 .map(|wa| wa.len())
535 .unwrap_or_default();
536 let running_transactions = self
537 .transactions
538 .read()
539 .map(|ts| ts.len())
540 .unwrap_or_default();
541 let finished_transactions = self
542 .finished_transactions
543 .read()
544 .map(|ft| ft.len())
545 .unwrap_or_default();
546
547 EndpointStats {
548 running_transactions,
549 finished_transactions,
550 waiting_ack,
551 }
552 }
553}
554
555impl EndpointBuilder {
556 pub fn new() -> Self {
557 EndpointBuilder {
558 allows: Vec::new(),
559 user_agent: VERSION.to_string(),
560 transport_layer: None,
561 cancel_token: None,
562 timer_interval: None,
563 option: None,
564 inspector: None,
565 }
566 }
567 pub fn with_option(&mut self, option: EndpointOption) -> &mut Self {
568 self.option = Some(option);
569 self
570 }
571 pub fn with_user_agent(&mut self, user_agent: &str) -> &mut Self {
572 self.user_agent = user_agent.to_string();
573 self
574 }
575
576 pub fn with_transport_layer(&mut self, transport_layer: TransportLayer) -> &mut Self {
577 self.transport_layer.replace(transport_layer);
578 self
579 }
580
581 pub fn with_cancel_token(&mut self, cancel_token: CancellationToken) -> &mut Self {
582 self.cancel_token.replace(cancel_token);
583 self
584 }
585
586 pub fn with_timer_interval(&mut self, timer_interval: Duration) -> &mut Self {
587 self.timer_interval.replace(timer_interval);
588 self
589 }
590 pub fn with_allows(&mut self, allows: Vec<rsip::Method>) -> &mut Self {
591 self.allows = allows;
592 self
593 }
594 pub fn with_inspector(&mut self, inspector: Box<dyn MessageInspector>) -> &mut Self {
595 self.inspector = Some(inspector);
596 self
597 }
598 pub fn follow_record_route(&mut self, enabled: bool) -> &mut Self {
599 self.option
600 .get_or_insert_with(EndpointOption::default)
601 .follow_record_route = enabled;
602 self
603 }
604 pub fn build(&mut self) -> Endpoint {
605 let cancel_token = self.cancel_token.take().unwrap_or_default();
606
607 let transport_layer = self
608 .transport_layer
609 .take()
610 .unwrap_or(TransportLayer::new(cancel_token.child_token()));
611
612 let allows = self.allows.to_owned();
613 let user_agent = self.user_agent.to_owned();
614 let timer_interval = self.timer_interval.to_owned();
615 let option = self.option.take();
616 let inspector = self.inspector.take();
617
618 let core = EndpointInner::new(
619 user_agent,
620 transport_layer,
621 cancel_token,
622 timer_interval,
623 allows,
624 option,
625 inspector,
626 );
627
628 Endpoint { inner: core }
629 }
630}
631
632impl Endpoint {
633 pub async fn serve(&self) {
634 let inner = self.inner.clone();
635 match inner.serve().await {
636 Ok(()) => {
637 info!("endpoint shutdown");
638 }
639 Err(e) => {
640 warn!("endpoint serve error: {:?}", e);
641 }
642 }
643 }
644
645 pub fn shutdown(&self) {
646 info!("endpoint shutdown requested");
647 self.inner.cancel_token.cancel();
648 }
649
650 pub fn incoming_transactions(&self) -> Result<TransactionReceiver> {
654 self.inner
655 .incoming_receiver
656 .lock()
657 .unwrap()
658 .take()
659 .ok_or_else(|| Error::EndpointError("incoming recevier taken".to_string()))
660 }
661
662 pub fn get_addrs(&self) -> Vec<SipAddr> {
663 self.inner.transport_layer.get_addrs()
664 }
665}