1use super::{
2 key::TransactionKey,
3 make_via_branch,
4 timer::Timer,
5 transaction::{Transaction, TransactionEvent, TransactionEventSender},
6 SipConnection, TransactionReceiver, TransactionSender, TransactionTimer,
7};
8use crate::rsip;
9use crate::{
10 dialog::DialogId,
11 transport::{SipAddr, TransportEvent, TransportLayer},
12 Error, Result, VERSION,
13};
14use rsip::{prelude::HeadersExt, SipMessage};
15use std::{
16 collections::HashMap,
17 sync::{Arc, Mutex, RwLock},
18 time::{Duration, Instant},
19};
20use tokio::{
21 select,
22 sync::mpsc::{error, unbounded_channel},
23};
24use tokio_util::sync::CancellationToken;
25use tracing::{debug, info, trace, warn};
26
27pub trait MessageInspector: Send + Sync {
28 fn before_send(&self, msg: SipMessage) -> SipMessage;
29 fn after_received(&self, msg: SipMessage) -> SipMessage;
30}
31
32#[non_exhaustive]
33pub struct EndpointOption {
34 pub t1: Duration,
35 pub t4: Duration,
36 pub t1x64: Duration,
37 pub timerc: Duration,
38 pub ignore_out_of_dialog_option: bool,
39 pub callid_suffix: Option<String>,
40 pub dialog_keepalive_duration: Option<Duration>,
41 pub follow_record_route: bool,
42}
43
44impl Default for EndpointOption {
45 fn default() -> Self {
46 EndpointOption {
47 t1: Duration::from_millis(500),
48 t4: Duration::from_secs(4),
49 t1x64: Duration::from_millis(64 * 500),
50 timerc: Duration::from_secs(180),
51 ignore_out_of_dialog_option: true,
52 callid_suffix: None,
53 dialog_keepalive_duration: Some(Duration::from_secs(60)),
54 follow_record_route: true,
55 }
56 }
57}
58
59pub struct EndpointStats {
60 pub running_transactions: usize,
61 pub finished_transactions: usize,
62 pub waiting_ack: usize,
63}
64
65pub struct EndpointInner {
98 pub allows: Mutex<Option<Vec<rsip::Method>>>,
99 pub user_agent: String,
100 pub timers: Timer<TransactionTimer>,
101 pub transport_layer: TransportLayer,
102 pub finished_transactions: RwLock<HashMap<TransactionKey, Option<SipMessage>>>,
103 pub transactions: RwLock<HashMap<TransactionKey, TransactionEventSender>>,
104 pub waiting_ack: RwLock<HashMap<DialogId, TransactionKey>>,
105 incoming_sender: TransactionSender,
106 incoming_receiver: Mutex<Option<TransactionReceiver>>,
107 cancel_token: CancellationToken,
108 timer_interval: Duration,
109 pub(super) inspector: Option<Box<dyn MessageInspector>>,
110 pub option: EndpointOption,
111}
112pub type EndpointInnerRef = Arc<EndpointInner>;
113
114pub struct EndpointBuilder {
133 allows: Vec<rsip::Method>,
134 user_agent: String,
135 transport_layer: Option<TransportLayer>,
136 cancel_token: Option<CancellationToken>,
137 timer_interval: Option<Duration>,
138 option: Option<EndpointOption>,
139 inspector: Option<Box<dyn MessageInspector>>,
140}
141
142pub struct Endpoint {
194 pub inner: EndpointInnerRef,
195}
196
197impl EndpointInner {
198 pub fn new(
199 user_agent: String,
200 transport_layer: TransportLayer,
201 cancel_token: CancellationToken,
202 timer_interval: Option<Duration>,
203 allows: Vec<rsip::Method>,
204 option: Option<EndpointOption>,
205 inspector: Option<Box<dyn MessageInspector>>,
206 ) -> Arc<Self> {
207 let (incoming_sender, incoming_receiver) = unbounded_channel();
208 Arc::new(EndpointInner {
209 allows: Mutex::new(Some(allows)),
210 user_agent,
211 timers: Timer::new(),
212 transport_layer,
213 transactions: RwLock::new(HashMap::new()),
214 finished_transactions: RwLock::new(HashMap::new()),
215 waiting_ack: RwLock::new(HashMap::new()),
216 timer_interval: timer_interval.unwrap_or(Duration::from_millis(20)),
217 cancel_token,
218 incoming_sender,
219 incoming_receiver: Mutex::new(Some(incoming_receiver)),
220 option: option.unwrap_or_default(),
221 inspector,
222 })
223 }
224
225 pub async fn serve(self: &Arc<Self>) -> Result<()> {
226 select! {
227 _ = self.cancel_token.cancelled() => {
228 },
229 r = self.process_timer() => {
230 _ = r?
231 },
232 r = self.clone().process_transport_layer() => {
233 _ = r?
234 },
235 }
236 Ok(())
237 }
238
239 async fn process_transport_layer(self: Arc<Self>) -> Result<()> {
241 self.transport_layer.serve_listens().await.ok();
242
243 let mut transport_rx = match self
244 .transport_layer
245 .inner
246 .transport_rx
247 .lock()
248 .unwrap()
249 .take()
250 {
251 Some(rx) => rx,
252 None => {
253 return Err(Error::EndpointError("transport_rx not set".to_string()));
254 }
255 };
256
257 while let Some(event) = transport_rx.recv().await {
258 match event {
259 TransportEvent::Incoming(msg, connection, from) => {
260 match self.on_received_message(msg, connection).await {
261 Ok(()) => {}
262 Err(e) => {
263 warn!(addr=%from,"on_received_message error: {}", e);
264 }
265 }
266 }
267 TransportEvent::New(t) => {
268 info!(addr=%t.get_addr(), "new connection");
269 }
270 TransportEvent::Closed(t) => {
271 info!(addr=%t.get_addr(), "closed connection");
272 }
273 }
274 }
275 Ok(())
276 }
277
278 pub async fn process_timer(&self) -> Result<()> {
279 let mut ticker = tokio::time::interval(self.timer_interval);
280 loop {
281 for t in self.timers.poll(Instant::now()) {
282 match t {
283 TransactionTimer::TimerCleanup(key) => {
284 trace!(%key, "TimerCleanup");
285 self.transactions
286 .write()
287 .as_mut()
288 .map(|ts| ts.remove(&key))
289 .ok();
290 self.finished_transactions
291 .write()
292 .as_mut()
293 .map(|t| t.remove(&key))
294 .ok();
295 continue;
296 }
297 _ => {}
298 }
299
300 if let Ok(Some(tu)) =
301 { self.transactions.read().as_ref().map(|ts| ts.get(&t.key())) }
302 {
303 match tu.send(TransactionEvent::Timer(t)) {
304 Ok(_) => {}
305 Err(error::SendError(t)) => match t {
306 TransactionEvent::Timer(t) => {
307 self.detach_transaction(t.key(), None);
308 }
309 _ => {}
310 },
311 }
312 }
313 }
314 ticker.tick().await;
315 }
316 }
317
318 pub async fn on_received_message(
320 self: &Arc<Self>,
321 msg: SipMessage,
322 connection: SipConnection,
323 ) -> Result<()> {
324 let mut key = match &msg {
325 SipMessage::Request(req) => {
326 TransactionKey::from_request(req, super::key::TransactionRole::Server)?
327 }
328 SipMessage::Response(resp) => {
329 TransactionKey::from_response(resp, super::key::TransactionRole::Client)?
330 }
331 };
332 match &msg {
333 SipMessage::Request(req) => {
334 match req.method() {
335 rsip::Method::Ack => match DialogId::try_from(req) {
336 Ok(dialog_id) => {
337 let tx_key = self
338 .waiting_ack
339 .read()
340 .map(|wa| wa.get(&dialog_id).cloned());
341 if let Ok(Some(tx_key)) = tx_key {
342 key = tx_key;
343 }
344 }
345 Err(_) => {}
346 },
347 _ => {}
348 }
349 let last_message = self
351 .finished_transactions
352 .read()
353 .unwrap()
354 .get(&key)
355 .cloned()
356 .flatten();
357
358 if let Some(last_message) = last_message {
359 connection.send(last_message, None).await?;
360 return Ok(());
361 }
362 }
363 SipMessage::Response(resp) => {
364 let last_message = self
365 .finished_transactions
366 .read()
367 .unwrap()
368 .get(&key)
369 .cloned()
370 .flatten();
371
372 if let Some(mut last_message) = last_message {
373 match last_message {
374 SipMessage::Request(ref mut last_req) => {
375 if last_req.method() == &rsip::Method::Ack {
376 match resp.status_code.kind() {
377 rsip::StatusCodeKind::Provisional => {
378 return Ok(());
379 }
380 rsip::StatusCodeKind::Successful => {
381 if last_req.to_header()?.tag().ok().is_none() {
382 return Ok(());
384 }
385 }
386 _ => {}
387 }
388 if let Ok(Some(tag)) = resp.to_header()?.tag() {
389 last_req.to_header_mut().and_then(|h| h.mut_tag(tag)).ok();
390 }
391 }
392 }
393 _ => {}
394 }
395 connection.send(last_message, None).await?;
396 return Ok(());
397 }
398 }
399 };
400
401 let msg = if let Some(inspector) = &self.inspector {
402 inspector.after_received(msg)
403 } else {
404 msg
405 };
406
407 if let Some(tu) = self.transactions.read().unwrap().get(&key) {
408 tu.send(TransactionEvent::Received(msg, Some(connection)))
409 .map_err(|e| Error::TransactionError(e.to_string(), key))?;
410 return Ok(());
411 }
412 let request = match msg {
414 SipMessage::Request(req) => req,
415 SipMessage::Response(resp) => {
416 if resp.cseq_header()?.method()? != rsip::Method::Cancel {
417 debug!(%key, "the transaction is not exist {}", resp);
418 }
419 return Ok(());
420 }
421 };
422
423 match request.method {
424 rsip::Method::Cancel => {
425 let resp = self.make_response(
426 &request,
427 rsip::StatusCode::CallTransactionDoesNotExist,
428 None,
429 );
430 let resp = if let Some(ref inspector) = self.inspector {
431 inspector.before_send(resp.into())
432 } else {
433 resp.into()
434 };
435 connection.send(resp, None).await?;
436 return Ok(());
437 }
438 rsip::Method::Ack => return Ok(()),
439 _ => {}
440 }
441
442 let tx =
443 Transaction::new_server(key.clone(), request.clone(), self.clone(), Some(connection));
444
445 self.incoming_sender.send(tx).ok();
446 Ok(())
447 }
448
449 pub fn attach_transaction(&self, key: &TransactionKey, tu_sender: TransactionEventSender) {
450 trace!(%key, "attach transaction");
451 self.transactions
452 .write()
453 .as_mut()
454 .map(|ts| ts.insert(key.clone(), tu_sender))
455 .ok();
456 }
457
458 pub fn detach_transaction(&self, key: &TransactionKey, last_message: Option<SipMessage>) {
459 trace!(%key, "detach transaction");
460 self.transactions
461 .write()
462 .as_mut()
463 .map(|ts| ts.remove(key))
464 .ok();
465
466 if let Some(msg) = last_message {
467 self.timers.timeout(
468 self.option.t1x64,
469 TransactionTimer::TimerCleanup(key.clone()), );
471
472 self.finished_transactions
473 .write()
474 .as_mut()
475 .map(|ft| ft.insert(key.clone(), Some(msg)))
476 .ok();
477 }
478 }
479
480 pub fn get_addrs(&self) -> Vec<SipAddr> {
481 self.transport_layer.get_addrs()
482 }
483
484 pub fn get_record_route(&self) -> Result<rsip::typed::RecordRoute> {
485 let first_addr = self
486 .transport_layer
487 .get_addrs()
488 .first()
489 .ok_or(Error::EndpointError("not sipaddrs".to_string()))
490 .cloned()?;
491 let rr = rsip::UriWithParamsList(vec![rsip::UriWithParams {
492 uri: first_addr.into(),
493 params: vec![rsip::Param::Other("lr".into(), None)],
494 }]);
495 Ok(rr.into())
496 }
497
498 pub fn get_via(
499 &self,
500 addr: Option<crate::transport::SipAddr>,
501 branch: Option<rsip::Param>,
502 ) -> Result<rsip::typed::Via> {
503 let first_addr = match addr {
504 Some(addr) => addr,
505 None => self
506 .transport_layer
507 .get_addrs()
508 .first()
509 .ok_or(Error::EndpointError("not sipaddrs".to_string()))
510 .cloned()?,
511 };
512
513 let via = rsip::typed::Via {
514 version: rsip::Version::V2,
515 transport: first_addr.r#type.unwrap_or_default(),
516 uri: first_addr.addr.into(),
517 params: vec![
518 branch.unwrap_or_else(make_via_branch),
519 rsip::Param::Other("rport".into(), None),
520 ],
521 };
522 Ok(via)
523 }
524
525 pub fn get_stats(&self) -> EndpointStats {
526 let waiting_ack = self
527 .waiting_ack
528 .read()
529 .map(|wa| wa.len())
530 .unwrap_or_default();
531 let running_transactions = self
532 .transactions
533 .read()
534 .map(|ts| ts.len())
535 .unwrap_or_default();
536 let finished_transactions = self
537 .finished_transactions
538 .read()
539 .map(|ft| ft.len())
540 .unwrap_or_default();
541
542 EndpointStats {
543 running_transactions,
544 finished_transactions,
545 waiting_ack,
546 }
547 }
548}
549
550impl EndpointBuilder {
551 pub fn new() -> Self {
552 EndpointBuilder {
553 allows: Vec::new(),
554 user_agent: VERSION.to_string(),
555 transport_layer: None,
556 cancel_token: None,
557 timer_interval: None,
558 option: None,
559 inspector: None,
560 }
561 }
562 pub fn with_option(&mut self, option: EndpointOption) -> &mut Self {
563 self.option = Some(option);
564 self
565 }
566 pub fn with_user_agent(&mut self, user_agent: &str) -> &mut Self {
567 self.user_agent = user_agent.to_string();
568 self
569 }
570
571 pub fn with_transport_layer(&mut self, transport_layer: TransportLayer) -> &mut Self {
572 self.transport_layer.replace(transport_layer);
573 self
574 }
575
576 pub fn with_cancel_token(&mut self, cancel_token: CancellationToken) -> &mut Self {
577 self.cancel_token.replace(cancel_token);
578 self
579 }
580
581 pub fn with_timer_interval(&mut self, timer_interval: Duration) -> &mut Self {
582 self.timer_interval.replace(timer_interval);
583 self
584 }
585 pub fn with_allows(&mut self, allows: Vec<rsip::Method>) -> &mut Self {
586 self.allows = allows;
587 self
588 }
589 pub fn with_inspector(&mut self, inspector: Box<dyn MessageInspector>) -> &mut Self {
590 self.inspector = Some(inspector);
591 self
592 }
593 pub fn follow_record_route(&mut self, enabled: bool) -> &mut Self {
594 self.option
595 .get_or_insert_with(EndpointOption::default)
596 .follow_record_route = enabled;
597 self
598 }
599 pub fn build(&mut self) -> Endpoint {
600 let cancel_token = self.cancel_token.take().unwrap_or_default();
601
602 let transport_layer = self
603 .transport_layer
604 .take()
605 .unwrap_or(TransportLayer::new(cancel_token.child_token()));
606
607 let allows = self.allows.to_owned();
608 let user_agent = self.user_agent.to_owned();
609 let timer_interval = self.timer_interval.to_owned();
610 let option = self.option.take();
611 let inspector = self.inspector.take();
612
613 let core = EndpointInner::new(
614 user_agent,
615 transport_layer,
616 cancel_token,
617 timer_interval,
618 allows,
619 option,
620 inspector,
621 );
622
623 Endpoint { inner: core }
624 }
625}
626
627impl Endpoint {
628 pub async fn serve(&self) {
629 let inner = self.inner.clone();
630 match inner.serve().await {
631 Ok(()) => {
632 info!("endpoint shutdown");
633 }
634 Err(e) => {
635 warn!("endpoint serve error: {:?}", e);
636 }
637 }
638 }
639
640 pub fn shutdown(&self) {
641 info!("endpoint shutdown requested");
642 self.inner.cancel_token.cancel();
643 }
644
645 pub fn incoming_transactions(&self) -> Result<TransactionReceiver> {
649 self.inner
650 .incoming_receiver
651 .lock()
652 .unwrap()
653 .take()
654 .ok_or_else(|| Error::EndpointError("incoming recevier taken".to_string()))
655 }
656
657 pub fn get_addrs(&self) -> Vec<SipAddr> {
658 self.inner.transport_layer.get_addrs()
659 }
660}