ftth_rsipstack/transport/
tcp.rs1use crate::{
2 transport::{
3 connection::TransportSender,
4 sip_addr::SipAddr,
5 stream::{StreamConnection, StreamConnectionInner},
6 SipConnection,
7 },
8 Result,
9};
10use rsip::SipMessage;
11use std::{fmt, sync::Arc};
12use tokio::net::TcpStream;
13use tokio_util::sync::CancellationToken;
14use tracing::info;
15
16type TcpInner =
17 StreamConnectionInner<tokio::io::ReadHalf<TcpStream>, tokio::io::WriteHalf<TcpStream>>;
18
19#[derive(Clone)]
20pub struct TcpConnection {
21 pub inner: Arc<TcpInner>,
22 pub cancel_token: Option<CancellationToken>,
23}
24
25impl TcpConnection {
26 pub async fn connect(
27 remote: &SipAddr,
28 cancel_token: Option<CancellationToken>,
29 ) -> Result<Self> {
30 let socket_addr = remote.get_socketaddr()?;
31 let stream = TcpStream::connect(socket_addr).await?;
32
33 let local_addr = SipAddr {
34 r#type: Some(rsip::transport::Transport::Tcp),
35 addr: SipConnection::resolve_bind_address(stream.local_addr()?).into(),
36 };
37
38 let (read_half, write_half) = tokio::io::split(stream);
39
40 let connection = TcpConnection {
41 inner: Arc::new(StreamConnectionInner::new(
42 local_addr,
43 remote.clone(),
44 read_half,
45 write_half,
46 )),
47 cancel_token,
48 };
49
50 info!(
51 "Created TCP client connection: {} -> {}",
52 connection.get_addr(),
53 remote
54 );
55
56 Ok(connection)
57 }
58
59 pub fn from_stream(
60 stream: TcpStream,
61 local_addr: SipAddr,
62 cancel_token: Option<CancellationToken>,
63 ) -> Result<Self> {
64 let remote_addr = stream.peer_addr()?;
65 let remote_sip_addr = SipAddr {
66 r#type: Some(rsip::transport::Transport::Tcp),
67 addr: remote_addr.into(),
68 };
69
70 let (read_half, write_half) = tokio::io::split(stream);
71
72 let connection = TcpConnection {
73 inner: Arc::new(StreamConnectionInner::new(
74 local_addr,
75 remote_sip_addr,
76 read_half,
77 write_half,
78 )),
79 cancel_token,
80 };
81
82 info!(
83 "Created TCP server connection: {} <- {}",
84 connection.get_addr(),
85 remote_addr
86 );
87
88 Ok(connection)
89 }
90
91 pub fn cancel_token(&self) -> Option<CancellationToken> {
92 self.cancel_token.clone()
93 }
94}
95
96#[async_trait::async_trait]
97impl StreamConnection for TcpConnection {
98 fn get_addr(&self) -> &SipAddr {
99 &self.inner.local_addr
100 }
101
102 async fn send_message(&self, msg: SipMessage) -> Result<()> {
103 self.inner.send_message(msg).await
104 }
105
106 async fn send_raw(&self, data: &[u8]) -> Result<()> {
107 self.inner.send_raw(data).await
108 }
109
110 async fn serve_loop(&self, sender: TransportSender) -> Result<()> {
111 let sip_connection = SipConnection::Tcp(self.clone());
112 self.inner.serve_loop(sender, sip_connection).await
113 }
114
115 async fn close(&self) -> Result<()> {
116 self.inner.close().await
117 }
118}
119
120impl fmt::Display for TcpConnection {
121 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122 write!(
123 f,
124 "TCP {} -> {}",
125 self.inner.local_addr, self.inner.remote_addr
126 )
127 }
128}
129
130impl fmt::Debug for TcpConnection {
131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132 fmt::Display::fmt(self, f)
133 }
134}