1use super::{
2 key::TransactionKey,
3 make_via_branch,
4 timer::Timer,
5 transaction::{Transaction, TransactionEvent, TransactionEventSender},
6 SipConnection, TransactionReceiver, TransactionSender, TransactionTimer,
7};
8use crate::{
9 dialog::DialogId,
10 transport::{SipAddr, TransportEvent, TransportLayer},
11 Error, Result, VERSION,
12};
13use rsip::{prelude::HeadersExt, SipMessage};
14use std::{
15 collections::HashMap,
16 sync::{Arc, Mutex, RwLock},
17 time::{Duration, Instant},
18};
19use tokio::{
20 select,
21 sync::mpsc::{error, unbounded_channel},
22};
23use tokio_util::sync::CancellationToken;
24use tracing::{debug, info, trace, warn};
25
26pub trait MessageInspector: Send + Sync {
27 fn before_send(&self, msg: SipMessage) -> SipMessage;
28 fn after_received(&self, msg: SipMessage) -> SipMessage;
29}
30
31pub struct EndpointOption {
32 pub t1: Duration,
33 pub t4: Duration,
34 pub t1x64: Duration,
35 pub timerc: Duration,
36 pub ignore_out_of_dialog_option: bool,
37 pub callid_suffix: Option<String>,
38 pub dialog_keepalive_duration: Option<Duration>,
39}
40
41impl Default for EndpointOption {
42 fn default() -> Self {
43 EndpointOption {
44 t1: Duration::from_millis(500),
45 t4: Duration::from_secs(4),
46 t1x64: Duration::from_millis(64 * 500),
47 timerc: Duration::from_secs(180),
48 ignore_out_of_dialog_option: true,
49 callid_suffix: None,
50 dialog_keepalive_duration: Some(Duration::from_secs(60)),
51 }
52 }
53}
54
55pub struct EndpointStats {
56 pub running_transactions: usize,
57 pub finished_transactions: usize,
58 pub waiting_ack: usize,
59}
60
61pub struct EndpointInner {
94 pub allows: Mutex<Option<Vec<rsip::Method>>>,
95 pub user_agent: String,
96 pub timers: Timer<TransactionTimer>,
97 pub transport_layer: TransportLayer,
98 pub finished_transactions: RwLock<HashMap<TransactionKey, Option<SipMessage>>>,
99 pub transactions: RwLock<HashMap<TransactionKey, TransactionEventSender>>,
100 pub waiting_ack: RwLock<HashMap<DialogId, TransactionKey>>,
101 incoming_sender: TransactionSender,
102 incoming_receiver: Mutex<Option<TransactionReceiver>>,
103 cancel_token: CancellationToken,
104 timer_interval: Duration,
105 pub(super) inspector: Option<Box<dyn MessageInspector>>,
106 pub option: EndpointOption,
107}
108pub type EndpointInnerRef = Arc<EndpointInner>;
109
110pub struct EndpointBuilder {
129 allows: Vec<rsip::Method>,
130 user_agent: String,
131 transport_layer: Option<TransportLayer>,
132 cancel_token: Option<CancellationToken>,
133 timer_interval: Option<Duration>,
134 option: Option<EndpointOption>,
135 inspector: Option<Box<dyn MessageInspector>>,
136}
137
138pub struct Endpoint {
190 pub inner: EndpointInnerRef,
191}
192
193impl EndpointInner {
194 pub fn new(
195 user_agent: String,
196 transport_layer: TransportLayer,
197 cancel_token: CancellationToken,
198 timer_interval: Option<Duration>,
199 allows: Vec<rsip::Method>,
200 option: Option<EndpointOption>,
201 inspector: Option<Box<dyn MessageInspector>>,
202 ) -> Arc<Self> {
203 let (incoming_sender, incoming_receiver) = unbounded_channel();
204 Arc::new(EndpointInner {
205 allows: Mutex::new(Some(allows)),
206 user_agent,
207 timers: Timer::new(),
208 transport_layer,
209 transactions: RwLock::new(HashMap::new()),
210 finished_transactions: RwLock::new(HashMap::new()),
211 waiting_ack: RwLock::new(HashMap::new()),
212 timer_interval: timer_interval.unwrap_or(Duration::from_millis(20)),
213 cancel_token,
214 incoming_sender,
215 incoming_receiver: Mutex::new(Some(incoming_receiver)),
216 option: option.unwrap_or_default(),
217 inspector,
218 })
219 }
220
221 pub async fn serve(self: &Arc<Self>) -> Result<()> {
222 select! {
223 _ = self.cancel_token.cancelled() => {
224 },
225 r = self.process_timer() => {
226 _ = r?
227 },
228 r = self.clone().process_transport_layer() => {
229 _ = r?
230 },
231 }
232 Ok(())
233 }
234
235 async fn process_transport_layer(self: Arc<Self>) -> Result<()> {
237 self.transport_layer.serve_listens().await.ok();
238
239 let mut transport_rx = match self
240 .transport_layer
241 .inner
242 .transport_rx
243 .lock()
244 .unwrap()
245 .take()
246 {
247 Some(rx) => rx,
248 None => {
249 return Err(Error::EndpointError("transport_rx not set".to_string()));
250 }
251 };
252
253 while let Some(event) = transport_rx.recv().await {
254 match event {
255 TransportEvent::Incoming(msg, connection, from) => {
256 match self.on_received_message(msg, connection).await {
257 Ok(()) => {}
258 Err(e) => {
259 warn!(addr=%from,"on_received_message error: {}", e);
260 }
261 }
262 }
263 TransportEvent::New(t) => {
264 info!(addr=%t.get_addr(), "new connection");
265 }
266 TransportEvent::Closed(t) => {
267 info!(addr=%t.get_addr(), "closed connection");
268 }
269 }
270 }
271 Ok(())
272 }
273
274 pub async fn process_timer(&self) -> Result<()> {
275 let mut ticker = tokio::time::interval(self.timer_interval);
276 loop {
277 for t in self.timers.poll(Instant::now()) {
278 match t {
279 TransactionTimer::TimerCleanup(key) => {
280 trace!(%key, "TimerCleanup");
281 self.transactions
282 .write()
283 .as_mut()
284 .map(|ts| ts.remove(&key))
285 .ok();
286 self.finished_transactions
287 .write()
288 .as_mut()
289 .map(|t| t.remove(&key))
290 .ok();
291 continue;
292 }
293 _ => {}
294 }
295
296 if let Ok(Some(tu)) =
297 { self.transactions.read().as_ref().map(|ts| ts.get(&t.key())) }
298 {
299 match tu.send(TransactionEvent::Timer(t)) {
300 Ok(_) => {}
301 Err(error::SendError(t)) => match t {
302 TransactionEvent::Timer(t) => {
303 self.detach_transaction(t.key(), None);
304 }
305 _ => {}
306 },
307 }
308 }
309 }
310 ticker.tick().await;
311 }
312 }
313
314 pub async fn on_received_message(
316 self: &Arc<Self>,
317 msg: SipMessage,
318 connection: SipConnection,
319 ) -> Result<()> {
320 let mut key = match &msg {
321 SipMessage::Request(req) => {
322 TransactionKey::from_request(req, super::key::TransactionRole::Server)?
323 }
324 SipMessage::Response(resp) => {
325 TransactionKey::from_response(resp, super::key::TransactionRole::Client)?
326 }
327 };
328 match &msg {
329 SipMessage::Request(req) => {
330 match req.method() {
331 rsip::Method::Ack => match DialogId::try_from(req) {
332 Ok(dialog_id) => {
333 let tx_key = self
334 .waiting_ack
335 .read()
336 .map(|wa| wa.get(&dialog_id).cloned());
337 if let Ok(Some(tx_key)) = tx_key {
338 key = tx_key;
339 }
340 }
341 Err(_) => {}
342 },
343 _ => {}
344 }
345 let last_message = self
347 .finished_transactions
348 .read()
349 .unwrap()
350 .get(&key)
351 .cloned()
352 .flatten();
353
354 if let Some(last_message) = last_message {
355 connection.send(last_message, None).await?;
356 return Ok(());
357 }
358 }
359 SipMessage::Response(resp) => {
360 let last_message = self
361 .finished_transactions
362 .read()
363 .unwrap()
364 .get(&key)
365 .cloned()
366 .flatten();
367
368 if let Some(mut last_message) = last_message {
369 match last_message {
370 SipMessage::Request(ref mut last_req) => {
371 if last_req.method() == &rsip::Method::Ack {
372 match resp.status_code.kind() {
373 rsip::StatusCodeKind::Provisional => {
374 return Ok(());
375 }
376 rsip::StatusCodeKind::Successful => {
377 if last_req.to_header()?.tag().ok().is_none() {
378 return Ok(());
380 }
381 }
382 _ => {}
383 }
384 if let Ok(Some(tag)) = resp.to_header()?.tag() {
385 last_req.to_header_mut().and_then(|h| h.mut_tag(tag)).ok();
386 }
387 }
388 }
389 _ => {}
390 }
391 connection.send(last_message, None).await?;
392 return Ok(());
393 }
394 }
395 };
396
397 let msg = if let Some(inspector) = &self.inspector {
398 inspector.after_received(msg)
399 } else {
400 msg
401 };
402
403 if let Some(tu) = self.transactions.read().unwrap().get(&key) {
404 tu.send(TransactionEvent::Received(msg, Some(connection)))
405 .map_err(|e| Error::TransactionError(e.to_string(), key))?;
406 return Ok(());
407 }
408 let request = match msg {
410 SipMessage::Request(req) => req,
411 SipMessage::Response(resp) => {
412 if resp.cseq_header()?.method()? != rsip::Method::Cancel {
413 debug!(%key, "the transaction is not exist {}", resp);
414 }
415 return Ok(());
416 }
417 };
418
419 match request.method {
420 rsip::Method::Cancel => {
421 let resp = self.make_response(
422 &request,
423 rsip::StatusCode::CallTransactionDoesNotExist,
424 None,
425 );
426 let resp = if let Some(ref inspector) = self.inspector {
427 inspector.before_send(resp.into())
428 } else {
429 resp.into()
430 };
431 connection.send(resp, None).await?;
432 return Ok(());
433 }
434 rsip::Method::Ack => return Ok(()),
435 _ => {}
436 }
437
438 let tx =
439 Transaction::new_server(key.clone(), request.clone(), self.clone(), Some(connection));
440
441 self.incoming_sender.send(tx).ok();
442 Ok(())
443 }
444
445 pub fn attach_transaction(&self, key: &TransactionKey, tu_sender: TransactionEventSender) {
446 trace!(%key, "attach transaction");
447 self.transactions
448 .write()
449 .as_mut()
450 .map(|ts| ts.insert(key.clone(), tu_sender))
451 .ok();
452 }
453
454 pub fn detach_transaction(&self, key: &TransactionKey, last_message: Option<SipMessage>) {
455 trace!(%key, "detach transaction");
456 self.transactions
457 .write()
458 .as_mut()
459 .map(|ts| ts.remove(key))
460 .ok();
461
462 if let Some(msg) = last_message {
463 self.timers.timeout(
464 self.option.t1x64,
465 TransactionTimer::TimerCleanup(key.clone()), );
467
468 self.finished_transactions
469 .write()
470 .as_mut()
471 .map(|ft| ft.insert(key.clone(), Some(msg)))
472 .ok();
473 }
474 }
475
476 pub fn get_addrs(&self) -> Vec<SipAddr> {
477 self.transport_layer.get_addrs()
478 }
479
480 pub fn get_record_route(&self) -> Result<rsip::typed::RecordRoute> {
481 let first_addr = self
482 .transport_layer
483 .get_addrs()
484 .first()
485 .ok_or(Error::EndpointError("not sipaddrs".to_string()))
486 .cloned()?;
487 let rr = rsip::UriWithParamsList(vec![rsip::UriWithParams {
488 uri: first_addr.into(),
489 params: vec![rsip::Param::Other("lr".into(), None)],
490 }]);
491 Ok(rr.into())
492 }
493
494 pub fn get_via(
495 &self,
496 addr: Option<crate::transport::SipAddr>,
497 branch: Option<rsip::Param>,
498 ) -> Result<rsip::typed::Via> {
499 let first_addr = match addr {
500 Some(addr) => addr,
501 None => self
502 .transport_layer
503 .get_addrs()
504 .first()
505 .ok_or(Error::EndpointError("not sipaddrs".to_string()))
506 .cloned()?,
507 };
508
509 let via = rsip::typed::Via {
510 version: rsip::Version::V2,
511 transport: first_addr.r#type.unwrap_or_default(),
512 uri: first_addr.addr.into(),
513 params: vec![
514 branch.unwrap_or_else(make_via_branch),
515 rsip::Param::Other("rport".into(), None),
516 ],
517 };
518 Ok(via)
519 }
520
521 pub fn get_stats(&self) -> EndpointStats {
522 let waiting_ack = self
523 .waiting_ack
524 .read()
525 .map(|wa| wa.len())
526 .unwrap_or_default();
527 let running_transactions = self
528 .transactions
529 .read()
530 .map(|ts| ts.len())
531 .unwrap_or_default();
532 let finished_transactions = self
533 .finished_transactions
534 .read()
535 .map(|ft| ft.len())
536 .unwrap_or_default();
537
538 EndpointStats {
539 running_transactions,
540 finished_transactions,
541 waiting_ack,
542 }
543 }
544}
545
546impl EndpointBuilder {
547 pub fn new() -> Self {
548 EndpointBuilder {
549 allows: Vec::new(),
550 user_agent: VERSION.to_string(),
551 transport_layer: None,
552 cancel_token: None,
553 timer_interval: None,
554 option: None,
555 inspector: None,
556 }
557 }
558 pub fn with_option(&mut self, option: EndpointOption) -> &mut Self {
559 self.option = Some(option);
560 self
561 }
562 pub fn with_user_agent(&mut self, user_agent: &str) -> &mut Self {
563 self.user_agent = user_agent.to_string();
564 self
565 }
566
567 pub fn with_transport_layer(&mut self, transport_layer: TransportLayer) -> &mut Self {
568 self.transport_layer.replace(transport_layer);
569 self
570 }
571
572 pub fn with_cancel_token(&mut self, cancel_token: CancellationToken) -> &mut Self {
573 self.cancel_token.replace(cancel_token);
574 self
575 }
576
577 pub fn with_timer_interval(&mut self, timer_interval: Duration) -> &mut Self {
578 self.timer_interval.replace(timer_interval);
579 self
580 }
581 pub fn with_allows(&mut self, allows: Vec<rsip::Method>) -> &mut Self {
582 self.allows = allows;
583 self
584 }
585 pub fn with_inspector(&mut self, inspector: Box<dyn MessageInspector>) -> &mut Self {
586 self.inspector = Some(inspector);
587 self
588 }
589 pub fn build(&mut self) -> Endpoint {
590 let cancel_token = self.cancel_token.take().unwrap_or_default();
591
592 let transport_layer = self
593 .transport_layer
594 .take()
595 .unwrap_or(TransportLayer::new(cancel_token.child_token()));
596
597 let allows = self.allows.to_owned();
598 let user_agent = self.user_agent.to_owned();
599 let timer_interval = self.timer_interval.to_owned();
600 let option = self.option.take();
601 let inspector = self.inspector.take();
602
603 let core = EndpointInner::new(
604 user_agent,
605 transport_layer,
606 cancel_token,
607 timer_interval,
608 allows,
609 option,
610 inspector,
611 );
612
613 Endpoint { inner: core }
614 }
615}
616
617impl Endpoint {
618 pub async fn serve(&self) {
619 let inner = self.inner.clone();
620 match inner.serve().await {
621 Ok(()) => {
622 info!("endpoint shutdown");
623 }
624 Err(e) => {
625 warn!("endpoint serve error: {:?}", e);
626 }
627 }
628 }
629
630 pub fn shutdown(&self) {
631 info!("endpoint shutdown requested");
632 self.inner.cancel_token.cancel();
633 }
634
635 pub fn incoming_transactions(&self) -> Result<TransactionReceiver> {
639 self.inner
640 .incoming_receiver
641 .lock()
642 .unwrap()
643 .take()
644 .ok_or_else(|| Error::EndpointError("incoming recevier taken".to_string()))
645 }
646
647 pub fn get_addrs(&self) -> Vec<SipAddr> {
648 self.inner.transport_layer.get_addrs()
649 }
650}