ftth_rsipstack/transport/
channel.rs

1use tokio_util::sync::CancellationToken;
2
3use super::{
4    connection::{TransportReceiver, TransportSender},
5    SipAddr, SipConnection,
6};
7use crate::rsip;
8use crate::Result;
9use std::sync::{Arc, Mutex};
10
11struct ChannelInner {
12    incoming: Mutex<Option<TransportReceiver>>,
13    outgoing: TransportSender,
14    addr: SipAddr,
15}
16
17#[derive(Clone)]
18pub struct ChannelConnection {
19    inner: Arc<ChannelInner>,
20    cancel_token: Option<CancellationToken>,
21}
22
23impl ChannelConnection {
24    pub async fn create_connection(
25        incoming: TransportReceiver,
26        outgoing: TransportSender,
27        addr: SipAddr,
28        cancel_token: Option<CancellationToken>,
29    ) -> Result<Self> {
30        let t = ChannelConnection {
31            inner: Arc::new(ChannelInner {
32                incoming: Mutex::new(Some(incoming)),
33                outgoing,
34                addr,
35            }),
36            cancel_token,
37        };
38        Ok(t)
39    }
40
41    pub async fn send(&self, msg: rsip::SipMessage) -> crate::Result<()> {
42        let transport = SipConnection::Channel(self.clone());
43        let source = self.get_addr().clone();
44        self.inner
45            .outgoing
46            .send(super::TransportEvent::Incoming(msg, transport, source))
47            .map_err(|e| e.into())
48    }
49
50    pub fn get_addr(&self) -> &SipAddr {
51        &self.inner.addr
52    }
53
54    pub async fn serve_loop(&self, sender: TransportSender) -> Result<()> {
55        let mut incoming = match self.inner.clone().incoming.lock().unwrap().take() {
56            Some(incoming) => incoming,
57            None => {
58                return Err(crate::Error::Error(
59                    "ChannelTransport::serve_loop called twice".to_string(),
60                ));
61            }
62        };
63        while let Some(event) = incoming.recv().await {
64            sender.send(event)?;
65        }
66        Ok(())
67    }
68    pub async fn close(&self) -> Result<()> {
69        Ok(())
70    }
71    pub fn cancel_token(&self) -> Option<CancellationToken> {
72        self.cancel_token.clone()
73    }
74}
75
76impl std::fmt::Display for ChannelConnection {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        write!(f, "*:*")
79    }
80}
81
82impl std::fmt::Debug for ChannelConnection {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        write!(f, "*:*")
85    }
86}