ftth_rsipstack/transport/
tcp.rs1use crate::rsip;
2use crate::{
3 transport::{
4 connection::TransportSender,
5 sip_addr::SipAddr,
6 stream::{StreamConnection, StreamConnectionInner},
7 SipConnection,
8 },
9 Result,
10};
11use rsip::SipMessage;
12use std::{fmt, sync::Arc};
13use tokio::net::TcpStream;
14use tokio_util::sync::CancellationToken;
15use tracing::info;
16
17type TcpInner =
18 StreamConnectionInner<tokio::io::ReadHalf<TcpStream>, tokio::io::WriteHalf<TcpStream>>;
19
20#[derive(Clone)]
21pub struct TcpConnection {
22 pub inner: Arc<TcpInner>,
23 pub cancel_token: Option<CancellationToken>,
24}
25
26impl TcpConnection {
27 pub async fn connect(
28 remote: &SipAddr,
29 cancel_token: Option<CancellationToken>,
30 ) -> Result<Self> {
31 let socket_addr = remote.get_socketaddr()?;
32 let stream = TcpStream::connect(socket_addr).await?;
33
34 let local_addr = SipAddr {
35 r#type: Some(rsip::transport::Transport::Tcp),
36 addr: SipConnection::resolve_bind_address(stream.local_addr()?).into(),
37 };
38
39 let (read_half, write_half) = tokio::io::split(stream);
40
41 let connection = TcpConnection {
42 inner: Arc::new(StreamConnectionInner::new(
43 local_addr,
44 remote.clone(),
45 read_half,
46 write_half,
47 )),
48 cancel_token,
49 };
50
51 info!(
52 "Created TCP client connection: {} -> {}",
53 connection.get_addr(),
54 remote
55 );
56
57 Ok(connection)
58 }
59
60 pub fn from_stream(
61 stream: TcpStream,
62 local_addr: SipAddr,
63 cancel_token: Option<CancellationToken>,
64 ) -> Result<Self> {
65 let remote_addr = stream.peer_addr()?;
66 let remote_sip_addr = SipAddr {
67 r#type: Some(rsip::transport::Transport::Tcp),
68 addr: remote_addr.into(),
69 };
70
71 let (read_half, write_half) = tokio::io::split(stream);
72
73 let connection = TcpConnection {
74 inner: Arc::new(StreamConnectionInner::new(
75 local_addr,
76 remote_sip_addr,
77 read_half,
78 write_half,
79 )),
80 cancel_token,
81 };
82
83 info!(
84 "Created TCP server connection: {} <- {}",
85 connection.get_addr(),
86 remote_addr
87 );
88
89 Ok(connection)
90 }
91
92 pub fn cancel_token(&self) -> Option<CancellationToken> {
93 self.cancel_token.clone()
94 }
95}
96
97#[async_trait::async_trait]
98impl StreamConnection for TcpConnection {
99 fn get_addr(&self) -> &SipAddr {
100 &self.inner.local_addr
101 }
102
103 async fn send_message(&self, msg: SipMessage) -> Result<()> {
104 self.inner.send_message(msg).await
105 }
106
107 async fn send_raw(&self, data: &[u8]) -> Result<()> {
108 self.inner.send_raw(data).await
109 }
110
111 async fn serve_loop(&self, sender: TransportSender) -> Result<()> {
112 let sip_connection = SipConnection::Tcp(self.clone());
113 self.inner.serve_loop(sender, sip_connection).await
114 }
115
116 async fn close(&self) -> Result<()> {
117 self.inner.close().await
118 }
119}
120
121impl fmt::Display for TcpConnection {
122 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
123 write!(
124 f,
125 "TCP {} -> {}",
126 self.inner.local_addr, self.inner.remote_addr
127 )
128 }
129}
130
131impl fmt::Debug for TcpConnection {
132 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133 fmt::Display::fmt(self, f)
134 }
135}