1use super::{
2 key::TransactionKey,
3 make_via_branch,
4 timer::Timer,
5 transaction::{Transaction, TransactionEvent, TransactionEventSender},
6 SipConnection, TransactionReceiver, TransactionSender, TransactionTimer,
7};
8use crate::rsip;
9use crate::{
10 dialog::DialogId,
11 transport::{SipAddr, TransportEvent, TransportLayer},
12 Error, Result, VERSION,
13};
14use rsip::{prelude::HeadersExt, SipMessage};
15use std::{
16 collections::HashMap,
17 sync::{Arc, Mutex, RwLock},
18 time::{Duration, Instant},
19};
20use tokio::{
21 select,
22 sync::mpsc::{error, unbounded_channel},
23};
24use tokio_util::sync::CancellationToken;
25use tracing::{debug, info, trace, warn};
26
27pub trait MessageInspector: Send + Sync {
28 fn before_send(&self, msg: SipMessage) -> SipMessage;
29 fn after_received(&self, msg: SipMessage) -> SipMessage;
30}
31
32pub struct EndpointOption {
33 pub t1: Duration,
34 pub t4: Duration,
35 pub t1x64: Duration,
36 pub timerc: Duration,
37 pub ignore_out_of_dialog_option: bool,
38 pub callid_suffix: Option<String>,
39 pub dialog_keepalive_duration: Option<Duration>,
40}
41
42impl Default for EndpointOption {
43 fn default() -> Self {
44 EndpointOption {
45 t1: Duration::from_millis(500),
46 t4: Duration::from_secs(4),
47 t1x64: Duration::from_millis(64 * 500),
48 timerc: Duration::from_secs(180),
49 ignore_out_of_dialog_option: true,
50 callid_suffix: None,
51 dialog_keepalive_duration: Some(Duration::from_secs(60)),
52 }
53 }
54}
55
56pub struct EndpointStats {
57 pub running_transactions: usize,
58 pub finished_transactions: usize,
59 pub waiting_ack: usize,
60}
61
62pub struct EndpointInner {
95 pub allows: Mutex<Option<Vec<rsip::Method>>>,
96 pub user_agent: String,
97 pub timers: Timer<TransactionTimer>,
98 pub transport_layer: TransportLayer,
99 pub finished_transactions: RwLock<HashMap<TransactionKey, Option<SipMessage>>>,
100 pub transactions: RwLock<HashMap<TransactionKey, TransactionEventSender>>,
101 pub waiting_ack: RwLock<HashMap<DialogId, TransactionKey>>,
102 incoming_sender: TransactionSender,
103 incoming_receiver: Mutex<Option<TransactionReceiver>>,
104 cancel_token: CancellationToken,
105 timer_interval: Duration,
106 pub(super) inspector: Option<Box<dyn MessageInspector>>,
107 pub option: EndpointOption,
108}
109pub type EndpointInnerRef = Arc<EndpointInner>;
110
111pub struct EndpointBuilder {
130 allows: Vec<rsip::Method>,
131 user_agent: String,
132 transport_layer: Option<TransportLayer>,
133 cancel_token: Option<CancellationToken>,
134 timer_interval: Option<Duration>,
135 option: Option<EndpointOption>,
136 inspector: Option<Box<dyn MessageInspector>>,
137}
138
139pub struct Endpoint {
191 pub inner: EndpointInnerRef,
192}
193
194impl EndpointInner {
195 pub fn new(
196 user_agent: String,
197 transport_layer: TransportLayer,
198 cancel_token: CancellationToken,
199 timer_interval: Option<Duration>,
200 allows: Vec<rsip::Method>,
201 option: Option<EndpointOption>,
202 inspector: Option<Box<dyn MessageInspector>>,
203 ) -> Arc<Self> {
204 let (incoming_sender, incoming_receiver) = unbounded_channel();
205 Arc::new(EndpointInner {
206 allows: Mutex::new(Some(allows)),
207 user_agent,
208 timers: Timer::new(),
209 transport_layer,
210 transactions: RwLock::new(HashMap::new()),
211 finished_transactions: RwLock::new(HashMap::new()),
212 waiting_ack: RwLock::new(HashMap::new()),
213 timer_interval: timer_interval.unwrap_or(Duration::from_millis(20)),
214 cancel_token,
215 incoming_sender,
216 incoming_receiver: Mutex::new(Some(incoming_receiver)),
217 option: option.unwrap_or_default(),
218 inspector,
219 })
220 }
221
222 pub async fn serve(self: &Arc<Self>) -> Result<()> {
223 select! {
224 _ = self.cancel_token.cancelled() => {
225 },
226 r = self.process_timer() => {
227 _ = r?
228 },
229 r = self.clone().process_transport_layer() => {
230 _ = r?
231 },
232 }
233 Ok(())
234 }
235
236 async fn process_transport_layer(self: Arc<Self>) -> Result<()> {
238 self.transport_layer.serve_listens().await.ok();
239
240 let mut transport_rx = match self
241 .transport_layer
242 .inner
243 .transport_rx
244 .lock()
245 .unwrap()
246 .take()
247 {
248 Some(rx) => rx,
249 None => {
250 return Err(Error::EndpointError("transport_rx not set".to_string()));
251 }
252 };
253
254 while let Some(event) = transport_rx.recv().await {
255 match event {
256 TransportEvent::Incoming(msg, connection, from) => {
257 match self.on_received_message(msg, connection).await {
258 Ok(()) => {}
259 Err(e) => {
260 warn!(addr=%from,"on_received_message error: {}", e);
261 }
262 }
263 }
264 TransportEvent::New(t) => {
265 info!(addr=%t.get_addr(), "new connection");
266 }
267 TransportEvent::Closed(t) => {
268 info!(addr=%t.get_addr(), "closed connection");
269 }
270 }
271 }
272 Ok(())
273 }
274
275 pub async fn process_timer(&self) -> Result<()> {
276 let mut ticker = tokio::time::interval(self.timer_interval);
277 loop {
278 for t in self.timers.poll(Instant::now()) {
279 match t {
280 TransactionTimer::TimerCleanup(key) => {
281 trace!(%key, "TimerCleanup");
282 self.transactions
283 .write()
284 .as_mut()
285 .map(|ts| ts.remove(&key))
286 .ok();
287 self.finished_transactions
288 .write()
289 .as_mut()
290 .map(|t| t.remove(&key))
291 .ok();
292 continue;
293 }
294 _ => {}
295 }
296
297 if let Ok(Some(tu)) =
298 { self.transactions.read().as_ref().map(|ts| ts.get(&t.key())) }
299 {
300 match tu.send(TransactionEvent::Timer(t)) {
301 Ok(_) => {}
302 Err(error::SendError(t)) => match t {
303 TransactionEvent::Timer(t) => {
304 self.detach_transaction(t.key(), None);
305 }
306 _ => {}
307 },
308 }
309 }
310 }
311 ticker.tick().await;
312 }
313 }
314
315 pub async fn on_received_message(
317 self: &Arc<Self>,
318 msg: SipMessage,
319 connection: SipConnection,
320 ) -> Result<()> {
321 let mut key = match &msg {
322 SipMessage::Request(req) => {
323 TransactionKey::from_request(req, super::key::TransactionRole::Server)?
324 }
325 SipMessage::Response(resp) => {
326 TransactionKey::from_response(resp, super::key::TransactionRole::Client)?
327 }
328 };
329 match &msg {
330 SipMessage::Request(req) => {
331 match req.method() {
332 rsip::Method::Ack => match DialogId::try_from(req) {
333 Ok(dialog_id) => {
334 let tx_key = self
335 .waiting_ack
336 .read()
337 .map(|wa| wa.get(&dialog_id).cloned());
338 if let Ok(Some(tx_key)) = tx_key {
339 key = tx_key;
340 }
341 }
342 Err(_) => {}
343 },
344 _ => {}
345 }
346 let last_message = self
348 .finished_transactions
349 .read()
350 .unwrap()
351 .get(&key)
352 .cloned()
353 .flatten();
354
355 if let Some(last_message) = last_message {
356 connection.send(last_message, None).await?;
357 return Ok(());
358 }
359 }
360 SipMessage::Response(resp) => {
361 let last_message = self
362 .finished_transactions
363 .read()
364 .unwrap()
365 .get(&key)
366 .cloned()
367 .flatten();
368
369 if let Some(mut last_message) = last_message {
370 match last_message {
371 SipMessage::Request(ref mut last_req) => {
372 if last_req.method() == &rsip::Method::Ack {
373 match resp.status_code.kind() {
374 rsip::StatusCodeKind::Provisional => {
375 return Ok(());
376 }
377 rsip::StatusCodeKind::Successful => {
378 if last_req.to_header()?.tag().ok().is_none() {
379 return Ok(());
381 }
382 }
383 _ => {}
384 }
385 if let Ok(Some(tag)) = resp.to_header()?.tag() {
386 last_req.to_header_mut().and_then(|h| h.mut_tag(tag)).ok();
387 }
388 }
389 }
390 _ => {}
391 }
392 connection.send(last_message, None).await?;
393 return Ok(());
394 }
395 }
396 };
397
398 let msg = if let Some(inspector) = &self.inspector {
399 inspector.after_received(msg)
400 } else {
401 msg
402 };
403
404 if let Some(tu) = self.transactions.read().unwrap().get(&key) {
405 tu.send(TransactionEvent::Received(msg, Some(connection)))
406 .map_err(|e| Error::TransactionError(e.to_string(), key))?;
407 return Ok(());
408 }
409 let request = match msg {
411 SipMessage::Request(req) => req,
412 SipMessage::Response(resp) => {
413 if resp.cseq_header()?.method()? != rsip::Method::Cancel {
414 debug!(%key, "the transaction is not exist {}", resp);
415 }
416 return Ok(());
417 }
418 };
419
420 match request.method {
421 rsip::Method::Cancel => {
422 let resp = self.make_response(
423 &request,
424 rsip::StatusCode::CallTransactionDoesNotExist,
425 None,
426 );
427 let resp = if let Some(ref inspector) = self.inspector {
428 inspector.before_send(resp.into())
429 } else {
430 resp.into()
431 };
432 connection.send(resp, None).await?;
433 return Ok(());
434 }
435 rsip::Method::Ack => return Ok(()),
436 _ => {}
437 }
438
439 let tx =
440 Transaction::new_server(key.clone(), request.clone(), self.clone(), Some(connection));
441
442 self.incoming_sender.send(tx).ok();
443 Ok(())
444 }
445
446 pub fn attach_transaction(&self, key: &TransactionKey, tu_sender: TransactionEventSender) {
447 trace!(%key, "attach transaction");
448 self.transactions
449 .write()
450 .as_mut()
451 .map(|ts| ts.insert(key.clone(), tu_sender))
452 .ok();
453 }
454
455 pub fn detach_transaction(&self, key: &TransactionKey, last_message: Option<SipMessage>) {
456 trace!(%key, "detach transaction");
457 self.transactions
458 .write()
459 .as_mut()
460 .map(|ts| ts.remove(key))
461 .ok();
462
463 if let Some(msg) = last_message {
464 self.timers.timeout(
465 self.option.t1x64,
466 TransactionTimer::TimerCleanup(key.clone()), );
468
469 self.finished_transactions
470 .write()
471 .as_mut()
472 .map(|ft| ft.insert(key.clone(), Some(msg)))
473 .ok();
474 }
475 }
476
477 pub fn get_addrs(&self) -> Vec<SipAddr> {
478 self.transport_layer.get_addrs()
479 }
480
481 pub fn get_record_route(&self) -> Result<rsip::typed::RecordRoute> {
482 let first_addr = self
483 .transport_layer
484 .get_addrs()
485 .first()
486 .ok_or(Error::EndpointError("not sipaddrs".to_string()))
487 .cloned()?;
488 let rr = rsip::UriWithParamsList(vec![rsip::UriWithParams {
489 uri: first_addr.into(),
490 params: vec![rsip::Param::Other("lr".into(), None)],
491 }]);
492 Ok(rr.into())
493 }
494
495 pub fn get_via(
496 &self,
497 addr: Option<crate::transport::SipAddr>,
498 branch: Option<rsip::Param>,
499 ) -> Result<rsip::typed::Via> {
500 let first_addr = match addr {
501 Some(addr) => addr,
502 None => self
503 .transport_layer
504 .get_addrs()
505 .first()
506 .ok_or(Error::EndpointError("not sipaddrs".to_string()))
507 .cloned()?,
508 };
509
510 let via = rsip::typed::Via {
511 version: rsip::Version::V2,
512 transport: first_addr.r#type.unwrap_or_default(),
513 uri: first_addr.addr.into(),
514 params: vec![
515 branch.unwrap_or_else(make_via_branch),
516 rsip::Param::Other("rport".into(), None),
517 ],
518 };
519 Ok(via)
520 }
521
522 pub fn get_stats(&self) -> EndpointStats {
523 let waiting_ack = self
524 .waiting_ack
525 .read()
526 .map(|wa| wa.len())
527 .unwrap_or_default();
528 let running_transactions = self
529 .transactions
530 .read()
531 .map(|ts| ts.len())
532 .unwrap_or_default();
533 let finished_transactions = self
534 .finished_transactions
535 .read()
536 .map(|ft| ft.len())
537 .unwrap_or_default();
538
539 EndpointStats {
540 running_transactions,
541 finished_transactions,
542 waiting_ack,
543 }
544 }
545}
546
547impl EndpointBuilder {
548 pub fn new() -> Self {
549 EndpointBuilder {
550 allows: Vec::new(),
551 user_agent: VERSION.to_string(),
552 transport_layer: None,
553 cancel_token: None,
554 timer_interval: None,
555 option: None,
556 inspector: None,
557 }
558 }
559 pub fn with_option(&mut self, option: EndpointOption) -> &mut Self {
560 self.option = Some(option);
561 self
562 }
563 pub fn with_user_agent(&mut self, user_agent: &str) -> &mut Self {
564 self.user_agent = user_agent.to_string();
565 self
566 }
567
568 pub fn with_transport_layer(&mut self, transport_layer: TransportLayer) -> &mut Self {
569 self.transport_layer.replace(transport_layer);
570 self
571 }
572
573 pub fn with_cancel_token(&mut self, cancel_token: CancellationToken) -> &mut Self {
574 self.cancel_token.replace(cancel_token);
575 self
576 }
577
578 pub fn with_timer_interval(&mut self, timer_interval: Duration) -> &mut Self {
579 self.timer_interval.replace(timer_interval);
580 self
581 }
582 pub fn with_allows(&mut self, allows: Vec<rsip::Method>) -> &mut Self {
583 self.allows = allows;
584 self
585 }
586 pub fn with_inspector(&mut self, inspector: Box<dyn MessageInspector>) -> &mut Self {
587 self.inspector = Some(inspector);
588 self
589 }
590 pub fn build(&mut self) -> Endpoint {
591 let cancel_token = self.cancel_token.take().unwrap_or_default();
592
593 let transport_layer = self
594 .transport_layer
595 .take()
596 .unwrap_or(TransportLayer::new(cancel_token.child_token()));
597
598 let allows = self.allows.to_owned();
599 let user_agent = self.user_agent.to_owned();
600 let timer_interval = self.timer_interval.to_owned();
601 let option = self.option.take();
602 let inspector = self.inspector.take();
603
604 let core = EndpointInner::new(
605 user_agent,
606 transport_layer,
607 cancel_token,
608 timer_interval,
609 allows,
610 option,
611 inspector,
612 );
613
614 Endpoint { inner: core }
615 }
616}
617
618impl Endpoint {
619 pub async fn serve(&self) {
620 let inner = self.inner.clone();
621 match inner.serve().await {
622 Ok(()) => {
623 info!("endpoint shutdown");
624 }
625 Err(e) => {
626 warn!("endpoint serve error: {:?}", e);
627 }
628 }
629 }
630
631 pub fn shutdown(&self) {
632 info!("endpoint shutdown requested");
633 self.inner.cancel_token.cancel();
634 }
635
636 pub fn incoming_transactions(&self) -> Result<TransactionReceiver> {
640 self.inner
641 .incoming_receiver
642 .lock()
643 .unwrap()
644 .take()
645 .ok_or_else(|| Error::EndpointError("incoming recevier taken".to_string()))
646 }
647
648 pub fn get_addrs(&self) -> Vec<SipAddr> {
649 self.inner.transport_layer.get_addrs()
650 }
651}