ftth_rsipstack/transport/
tcp.rs

1use crate::rsip;
2use crate::{
3    transport::{
4        connection::TransportSender,
5        sip_addr::SipAddr,
6        stream::{StreamConnection, StreamConnectionInner},
7        SipConnection,
8    },
9    Result,
10};
11use rsip::SipMessage;
12use std::{fmt, sync::Arc};
13use tokio::net::TcpStream;
14use tokio_util::sync::CancellationToken;
15use tracing::info;
16
17type TcpInner =
18    StreamConnectionInner<tokio::io::ReadHalf<TcpStream>, tokio::io::WriteHalf<TcpStream>>;
19
20#[derive(Clone)]
21pub struct TcpConnection {
22    pub inner: Arc<TcpInner>,
23    pub cancel_token: Option<CancellationToken>,
24}
25
26impl TcpConnection {
27    pub async fn connect(
28        remote: &SipAddr,
29        cancel_token: Option<CancellationToken>,
30    ) -> Result<Self> {
31        let socket_addr = remote.get_socketaddr()?;
32        let stream = TcpStream::connect(socket_addr).await?;
33
34        let local_addr = SipAddr {
35            r#type: Some(rsip::transport::Transport::Tcp),
36            addr: SipConnection::resolve_bind_address(stream.local_addr()?).into(),
37        };
38
39        let (read_half, write_half) = tokio::io::split(stream);
40
41        let connection = TcpConnection {
42            inner: Arc::new(StreamConnectionInner::new(
43                local_addr,
44                remote.clone(),
45                read_half,
46                write_half,
47            )),
48            cancel_token,
49        };
50
51        info!(
52            "Created TCP client connection: {} -> {}",
53            connection.get_addr(),
54            remote
55        );
56
57        Ok(connection)
58    }
59
60    pub fn from_stream(
61        stream: TcpStream,
62        local_addr: SipAddr,
63        cancel_token: Option<CancellationToken>,
64    ) -> Result<Self> {
65        let remote_addr = stream.peer_addr()?;
66        let remote_sip_addr = SipAddr {
67            r#type: Some(rsip::transport::Transport::Tcp),
68            addr: remote_addr.into(),
69        };
70
71        let (read_half, write_half) = tokio::io::split(stream);
72
73        let connection = TcpConnection {
74            inner: Arc::new(StreamConnectionInner::new(
75                local_addr,
76                remote_sip_addr,
77                read_half,
78                write_half,
79            )),
80            cancel_token,
81        };
82
83        info!(
84            "Created TCP server connection: {} <- {}",
85            connection.get_addr(),
86            remote_addr
87        );
88
89        Ok(connection)
90    }
91
92    pub fn cancel_token(&self) -> Option<CancellationToken> {
93        self.cancel_token.clone()
94    }
95}
96
97#[async_trait::async_trait]
98impl StreamConnection for TcpConnection {
99    fn get_addr(&self) -> &SipAddr {
100        &self.inner.local_addr
101    }
102
103    async fn send_message(&self, msg: SipMessage) -> Result<()> {
104        self.inner.send_message(msg).await
105    }
106
107    async fn send_raw(&self, data: &[u8]) -> Result<()> {
108        self.inner.send_raw(data).await
109    }
110
111    async fn serve_loop(&self, sender: TransportSender) -> Result<()> {
112        let sip_connection = SipConnection::Tcp(self.clone());
113        self.inner.serve_loop(sender, sip_connection).await
114    }
115
116    async fn close(&self) -> Result<()> {
117        self.inner.close().await
118    }
119}
120
121impl fmt::Display for TcpConnection {
122    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
123        write!(
124            f,
125            "TCP {} -> {}",
126            self.inner.local_addr, self.inner.remote_addr
127        )
128    }
129}
130
131impl fmt::Debug for TcpConnection {
132    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133        fmt::Display::fmt(self, f)
134    }
135}