ftth_rsipstack/transport/
tcp.rs

1use crate::{
2    transport::{
3        connection::TransportSender,
4        sip_addr::SipAddr,
5        stream::{StreamConnection, StreamConnectionInner},
6        SipConnection,
7    },
8    Result,
9};
10use rsip::SipMessage;
11use std::{fmt, sync::Arc};
12use tokio::net::TcpStream;
13use tokio_util::sync::CancellationToken;
14use tracing::info;
15
16type TcpInner =
17    StreamConnectionInner<tokio::io::ReadHalf<TcpStream>, tokio::io::WriteHalf<TcpStream>>;
18
19#[derive(Clone)]
20pub struct TcpConnection {
21    pub inner: Arc<TcpInner>,
22    pub cancel_token: Option<CancellationToken>,
23}
24
25impl TcpConnection {
26    pub async fn connect(
27        remote: &SipAddr,
28        cancel_token: Option<CancellationToken>,
29    ) -> Result<Self> {
30        let socket_addr = remote.get_socketaddr()?;
31        let stream = TcpStream::connect(socket_addr).await?;
32
33        let local_addr = SipAddr {
34            r#type: Some(rsip::transport::Transport::Tcp),
35            addr: SipConnection::resolve_bind_address(stream.local_addr()?).into(),
36        };
37
38        let (read_half, write_half) = tokio::io::split(stream);
39
40        let connection = TcpConnection {
41            inner: Arc::new(StreamConnectionInner::new(
42                local_addr,
43                remote.clone(),
44                read_half,
45                write_half,
46            )),
47            cancel_token,
48        };
49
50        info!(
51            "Created TCP client connection: {} -> {}",
52            connection.get_addr(),
53            remote
54        );
55
56        Ok(connection)
57    }
58
59    pub fn from_stream(
60        stream: TcpStream,
61        local_addr: SipAddr,
62        cancel_token: Option<CancellationToken>,
63    ) -> Result<Self> {
64        let remote_addr = stream.peer_addr()?;
65        let remote_sip_addr = SipAddr {
66            r#type: Some(rsip::transport::Transport::Tcp),
67            addr: remote_addr.into(),
68        };
69
70        let (read_half, write_half) = tokio::io::split(stream);
71
72        let connection = TcpConnection {
73            inner: Arc::new(StreamConnectionInner::new(
74                local_addr,
75                remote_sip_addr,
76                read_half,
77                write_half,
78            )),
79            cancel_token,
80        };
81
82        info!(
83            "Created TCP server connection: {} <- {}",
84            connection.get_addr(),
85            remote_addr
86        );
87
88        Ok(connection)
89    }
90
91    pub fn cancel_token(&self) -> Option<CancellationToken> {
92        self.cancel_token.clone()
93    }
94}
95
96#[async_trait::async_trait]
97impl StreamConnection for TcpConnection {
98    fn get_addr(&self) -> &SipAddr {
99        &self.inner.local_addr
100    }
101
102    async fn send_message(&self, msg: SipMessage) -> Result<()> {
103        self.inner.send_message(msg).await
104    }
105
106    async fn send_raw(&self, data: &[u8]) -> Result<()> {
107        self.inner.send_raw(data).await
108    }
109
110    async fn serve_loop(&self, sender: TransportSender) -> Result<()> {
111        let sip_connection = SipConnection::Tcp(self.clone());
112        self.inner.serve_loop(sender, sip_connection).await
113    }
114
115    async fn close(&self) -> Result<()> {
116        self.inner.close().await
117    }
118}
119
120impl fmt::Display for TcpConnection {
121    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122        write!(
123            f,
124            "TCP {} -> {}",
125            self.inner.local_addr, self.inner.remote_addr
126        )
127    }
128}
129
130impl fmt::Debug for TcpConnection {
131    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132        fmt::Display::fmt(self, f)
133    }
134}