use std::{
net::{IpAddr, SocketAddr},
sync::{Arc, Mutex},
thread,
time::Duration,
};
use futures::channel;
use tentacle::{
ProtocolId, async_trait,
builder::{MetaBuilder, ServiceBuilder},
context::{ProtocolContext, ProtocolContextMutRef},
multiaddr::Multiaddr,
secio::SecioKeyPair,
service::{ProtocolHandle, ProtocolMeta, Service, ServiceEvent},
traits::{ServiceHandle, ServiceProtocol},
};
#[cfg(feature = "ws")]
use tokio::io::AsyncReadExt;
use tokio::{io::AsyncWriteExt, net::TcpStream};
fn build_proxy_v1_header(src_ip: &str, dst_ip: &str, src_port: u16, dst_port: u16) -> String {
let protocol = if src_ip.contains(':') { "TCP6" } else { "TCP4" };
format!(
"PROXY {} {} {} {} {}\r\n",
protocol, src_ip, dst_ip, src_port, dst_port
)
}
fn build_proxy_v2_header_ipv4(
src_ip: [u8; 4],
dst_ip: [u8; 4],
src_port: u16,
dst_port: u16,
) -> Vec<u8> {
let mut header = Vec::new();
header.extend_from_slice(&[
0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A,
]);
header.push(0x21);
header.push(0x11);
header.extend_from_slice(&12u16.to_be_bytes());
header.extend_from_slice(&src_ip);
header.extend_from_slice(&dst_ip);
header.extend_from_slice(&src_port.to_be_bytes());
header.extend_from_slice(&dst_port.to_be_bytes());
header
}
fn build_proxy_v2_header_ipv6(
src_ip: [u8; 16],
dst_ip: [u8; 16],
src_port: u16,
dst_port: u16,
) -> Vec<u8> {
let mut header = Vec::new();
header.extend_from_slice(&[
0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A,
]);
header.push(0x21);
header.push(0x21);
header.extend_from_slice(&36u16.to_be_bytes());
header.extend_from_slice(&src_ip);
header.extend_from_slice(&dst_ip);
header.extend_from_slice(&src_port.to_be_bytes());
header.extend_from_slice(&dst_port.to_be_bytes());
header
}
#[derive(Clone, Default)]
struct CollectedAddresses {
inner: Arc<Mutex<Vec<Multiaddr>>>,
}
impl CollectedAddresses {
fn push(&self, addr: Multiaddr) {
self.inner.lock().unwrap().push(addr);
}
fn get_all(&self) -> Vec<Multiaddr> {
self.inner.lock().unwrap().clone()
}
}
struct AddressCollectorHandle {
collected: CollectedAddresses,
sender: crossbeam_channel::Sender<()>,
}
#[async_trait]
impl ServiceHandle for AddressCollectorHandle {
async fn handle_event(
&mut self,
_context: &mut tentacle::context::ServiceContext,
event: ServiceEvent,
) {
if let ServiceEvent::SessionOpen { session_context } = event {
self.collected.push(session_context.address.clone());
self.sender.try_send(()).unwrap();
}
}
}
struct TestProtocol;
#[async_trait]
impl ServiceProtocol for TestProtocol {
async fn init(&mut self, _context: &mut ProtocolContext) {}
async fn connected(&mut self, _context: ProtocolContextMutRef<'_>, _version: &str) {}
async fn disconnected(&mut self, _context: ProtocolContextMutRef<'_>) {}
}
fn create_meta(id: ProtocolId) -> ProtocolMeta {
MetaBuilder::new()
.id(id)
.service_handle(move || {
let handle = Box::new(TestProtocol);
ProtocolHandle::Callback(handle)
})
.build()
}
fn create_service(
collected: CollectedAddresses,
sender: crossbeam_channel::Sender<()>,
) -> Service<AddressCollectorHandle, SecioKeyPair> {
let meta = create_meta(1.into());
ServiceBuilder::default()
.insert_protocol(meta)
.forever(false)
.build(AddressCollectorHandle { collected, sender })
}
fn extract_ip_from_multiaddr(addr: &Multiaddr) -> Option<IpAddr> {
use tentacle::multiaddr::Protocol;
for proto in addr.iter() {
match proto {
Protocol::Ip4(ip) => return Some(IpAddr::V4(ip)),
Protocol::Ip6(ip) => return Some(IpAddr::V6(ip)),
_ => continue,
}
}
None
}
#[test]
fn test_proxy_protocol_v1_ipv4() {
let collected = CollectedAddresses::default();
let (sender, receiver) = crossbeam_channel::bounded(1);
let (addr_sender, addr_receiver) = channel::oneshot::channel::<Multiaddr>();
let collected_clone = collected.clone();
thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut service = create_service(collected_clone, sender);
rt.block_on(async move {
let listen_addr = service
.listen("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.await
.unwrap();
addr_sender.send(listen_addr).unwrap();
service.run().await
});
});
let listen_addr = futures::executor::block_on(addr_receiver).unwrap();
let socket_addr: SocketAddr = {
use tentacle::multiaddr::Protocol;
let mut ip = None;
let mut port = None;
for proto in listen_addr.iter() {
match proto {
Protocol::Ip4(i) => ip = Some(IpAddr::V4(i)),
Protocol::Tcp(p) => port = Some(p),
_ => {}
}
}
SocketAddr::new(ip.unwrap(), port.unwrap())
};
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let mut stream = TcpStream::connect(socket_addr).await.unwrap();
let proxy_header = build_proxy_v1_header("203.0.113.50", "192.168.1.1", 54321, 80);
stream.write_all(proxy_header.as_bytes()).await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
});
receiver.recv_timeout(Duration::from_secs(5)).unwrap();
thread::sleep(Duration::from_millis(100));
let addresses = collected.get_all();
assert!(
!addresses.is_empty(),
"Should have collected at least one address"
);
let first_addr = &addresses[0];
let ip = extract_ip_from_multiaddr(first_addr);
assert!(ip.is_some(), "Should be able to extract IP from address");
assert_eq!(
ip.unwrap().to_string(),
"203.0.113.50",
"Should use the IP from PROXY protocol header"
);
}
#[test]
fn test_proxy_protocol_v2_ipv4() {
let collected = CollectedAddresses::default();
let (sender, receiver) = crossbeam_channel::bounded(1);
let (addr_sender, addr_receiver) = channel::oneshot::channel::<Multiaddr>();
let collected_clone = collected.clone();
thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut service = create_service(collected_clone, sender);
rt.block_on(async move {
let listen_addr = service
.listen("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.await
.unwrap();
addr_sender.send(listen_addr).unwrap();
service.run().await
});
});
let listen_addr = futures::executor::block_on(addr_receiver).unwrap();
let socket_addr: SocketAddr = {
use tentacle::multiaddr::Protocol;
let mut ip = None;
let mut port = None;
for proto in listen_addr.iter() {
match proto {
Protocol::Ip4(i) => ip = Some(IpAddr::V4(i)),
Protocol::Tcp(p) => port = Some(p),
_ => {}
}
}
SocketAddr::new(ip.unwrap(), port.unwrap())
};
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let mut stream = TcpStream::connect(socket_addr).await.unwrap();
let proxy_header = build_proxy_v2_header_ipv4(
[10, 20, 30, 40], [192, 168, 1, 1], 12345, 80, );
stream.write_all(&proxy_header).await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
});
receiver.recv_timeout(Duration::from_secs(5)).unwrap();
thread::sleep(Duration::from_millis(100));
let addresses = collected.get_all();
assert!(
!addresses.is_empty(),
"Should have collected at least one address"
);
let first_addr = &addresses[0];
let ip = extract_ip_from_multiaddr(first_addr);
assert!(ip.is_some(), "Should be able to extract IP from address");
assert_eq!(
ip.unwrap().to_string(),
"10.20.30.40",
"Should use the IP from PROXY protocol v2 header"
);
}
#[test]
fn test_proxy_protocol_v1_ipv6() {
let collected = CollectedAddresses::default();
let (sender, receiver) = crossbeam_channel::bounded(1);
let (addr_sender, addr_receiver) = channel::oneshot::channel::<Multiaddr>();
let collected_clone = collected.clone();
thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut service = create_service(collected_clone, sender);
rt.block_on(async move {
let listen_addr = service
.listen("/ip6/::1/tcp/0".parse().unwrap())
.await
.unwrap();
addr_sender.send(listen_addr).unwrap();
service.run().await
});
});
let listen_addr = futures::executor::block_on(addr_receiver).unwrap();
let socket_addr: SocketAddr = {
use tentacle::multiaddr::Protocol;
let mut ip = None;
let mut port = None;
for proto in listen_addr.iter() {
match proto {
Protocol::Ip6(i) => ip = Some(IpAddr::V6(i)),
Protocol::Tcp(p) => port = Some(p),
_ => {}
}
}
SocketAddr::new(ip.unwrap(), port.unwrap())
};
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let mut stream = TcpStream::connect(socket_addr).await.unwrap();
let proxy_header = build_proxy_v1_header("2001:db8::1", "2001:db8::2", 54321, 80);
stream.write_all(proxy_header.as_bytes()).await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
});
receiver.recv_timeout(Duration::from_secs(5)).unwrap();
thread::sleep(Duration::from_millis(100));
let addresses = collected.get_all();
assert!(
!addresses.is_empty(),
"Should have collected at least one address"
);
let first_addr = &addresses[0];
let ip = extract_ip_from_multiaddr(first_addr);
assert!(ip.is_some(), "Should be able to extract IP from address");
assert_eq!(
ip.unwrap().to_string(),
"2001:db8::1",
"Should use the IPv6 from PROXY protocol header"
);
}
#[test]
fn test_proxy_protocol_v2_ipv6() {
let collected = CollectedAddresses::default();
let (sender, receiver) = crossbeam_channel::bounded(1);
let (addr_sender, addr_receiver) = channel::oneshot::channel::<Multiaddr>();
let collected_clone = collected.clone();
thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut service = create_service(collected_clone, sender);
rt.block_on(async move {
let listen_addr = service
.listen("/ip6/::1/tcp/0".parse().unwrap())
.await
.unwrap();
addr_sender.send(listen_addr).unwrap();
service.run().await
});
});
let listen_addr = futures::executor::block_on(addr_receiver).unwrap();
let socket_addr: SocketAddr = {
use tentacle::multiaddr::Protocol;
let mut ip = None;
let mut port = None;
for proto in listen_addr.iter() {
match proto {
Protocol::Ip6(i) => ip = Some(IpAddr::V6(i)),
Protocol::Tcp(p) => port = Some(p),
_ => {}
}
}
SocketAddr::new(ip.unwrap(), port.unwrap())
};
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let mut stream = TcpStream::connect(socket_addr).await.unwrap();
let src_ip: [u8; 16] = [
0x20, 0x01, 0x0d, 0xb8, 0x85, 0xa3, 0x00, 0x00, 0x00, 0x00, 0x8a, 0x2e, 0x03, 0x70,
0x73, 0x34,
];
let dst_ip: [u8; 16] = [
0x20, 0x01, 0x0d, 0xb8, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x01,
];
let proxy_header = build_proxy_v2_header_ipv6(src_ip, dst_ip, 12345, 80);
stream.write_all(&proxy_header).await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
});
receiver.recv_timeout(Duration::from_secs(5)).unwrap();
thread::sleep(Duration::from_millis(100));
let addresses = collected.get_all();
assert!(
!addresses.is_empty(),
"Should have collected at least one address"
);
let first_addr = &addresses[0];
let ip = extract_ip_from_multiaddr(first_addr);
assert!(ip.is_some(), "Should be able to extract IP from address");
assert_eq!(
ip.unwrap().to_string(),
"2001:db8:85a3::8a2e:370:7334",
"Should use the IPv6 from PROXY protocol v2 header"
);
}
#[test]
fn test_normal_connection_without_proxy_protocol() {
let collected = CollectedAddresses::default();
let (sender, receiver) = crossbeam_channel::bounded(1);
let (addr_sender, addr_receiver) = channel::oneshot::channel::<Multiaddr>();
let collected_clone = collected.clone();
thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut service = create_service(collected_clone, sender);
rt.block_on(async move {
let listen_addr = service
.listen("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.await
.unwrap();
addr_sender.send(listen_addr).unwrap();
service.run().await
});
});
let listen_addr = futures::executor::block_on(addr_receiver).unwrap();
let socket_addr: SocketAddr = {
use tentacle::multiaddr::Protocol;
let mut ip = None;
let mut port = None;
for proto in listen_addr.iter() {
match proto {
Protocol::Ip4(i) => ip = Some(IpAddr::V4(i)),
Protocol::Tcp(p) => port = Some(p),
_ => {}
}
}
SocketAddr::new(ip.unwrap(), port.unwrap())
};
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let mut stream = TcpStream::connect(socket_addr).await.unwrap();
let non_proxy_data = [
0x00, 0x01, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, ];
stream.write_all(&non_proxy_data).await.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
});
receiver.recv_timeout(Duration::from_secs(5)).unwrap();
thread::sleep(Duration::from_millis(100));
let addresses = collected.get_all();
assert!(
!addresses.is_empty(),
"Should have collected at least one address"
);
let first_addr = &addresses[0];
let ip = extract_ip_from_multiaddr(first_addr);
assert!(ip.is_some(), "Should be able to extract IP from address");
assert_eq!(
ip.unwrap().to_string(),
"127.0.0.1",
"Should use the socket address when no PROXY protocol is present"
);
}
#[cfg(feature = "ws")]
fn build_ws_upgrade_request_with_forwarded_for(host: &str, forwarded_ip: &str) -> String {
let ws_key = "dGhlIHNhbXBsZSBub25jZQ==";
format!(
"GET / HTTP/1.1\r\n\
Host: {}\r\n\
Upgrade: websocket\r\n\
Connection: Upgrade\r\n\
Sec-WebSocket-Key: {}\r\n\
Sec-WebSocket-Version: 13\r\n\
X-Forwarded-For: {}\r\n\
\r\n",
host, ws_key, forwarded_ip
)
}
#[cfg(feature = "ws")]
fn build_ws_upgrade_request_with_forwarded_for_and_port(
host: &str,
forwarded_ip: &str,
forwarded_port: u16,
) -> String {
let ws_key = "dGhlIHNhbXBsZSBub25jZQ==";
format!(
"GET / HTTP/1.1\r\n\
Host: {}\r\n\
Upgrade: websocket\r\n\
Connection: Upgrade\r\n\
Sec-WebSocket-Key: {}\r\n\
Sec-WebSocket-Version: 13\r\n\
X-Forwarded-For: {}\r\n\
X-Forwarded-Port: {}\r\n\
\r\n",
host, ws_key, forwarded_ip, forwarded_port
)
}
#[cfg(feature = "ws")]
#[test]
fn test_ws_x_forwarded_for() {
let collected = CollectedAddresses::default();
let (sender, receiver) = crossbeam_channel::bounded(1);
let (addr_sender, addr_receiver) = channel::oneshot::channel::<Multiaddr>();
let collected_clone = collected.clone();
thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut service = create_service(collected_clone, sender);
rt.block_on(async move {
let listen_addr = service
.listen("/ip4/127.0.0.1/tcp/0/ws".parse().unwrap())
.await
.unwrap();
addr_sender.send(listen_addr).unwrap();
service.run().await
});
});
let listen_addr = futures::executor::block_on(addr_receiver).unwrap();
let socket_addr: SocketAddr = {
use tentacle::multiaddr::Protocol;
let mut ip = None;
let mut port = None;
for proto in listen_addr.iter() {
match proto {
Protocol::Ip4(i) => ip = Some(IpAddr::V4(i)),
Protocol::Tcp(p) => port = Some(p),
_ => {}
}
}
SocketAddr::new(ip.unwrap(), port.unwrap())
};
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let mut stream = TcpStream::connect(socket_addr).await.unwrap();
let ws_request = build_ws_upgrade_request_with_forwarded_for(
&format!("127.0.0.1:{}", socket_addr.port()),
"198.51.100.178",
);
stream.write_all(ws_request.as_bytes()).await.unwrap();
let mut response = vec![0u8; 1024];
stream.read_buf(&mut response).await.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
});
receiver.recv_timeout(Duration::from_secs(5)).unwrap();
thread::sleep(Duration::from_millis(100));
let addresses = collected.get_all();
assert!(
!addresses.is_empty(),
"Should have collected at least one address"
);
let first_addr = &addresses[0];
let ip = extract_ip_from_multiaddr(first_addr);
assert!(ip.is_some(), "Should be able to extract IP from address");
assert_eq!(
ip.unwrap().to_string(),
"198.51.100.178",
"Should use the IP from X-Forwarded-For header"
);
}
#[cfg(feature = "ws")]
#[test]
fn test_ws_without_x_forwarded_for() {
let collected = CollectedAddresses::default();
let (sender, receiver) = crossbeam_channel::bounded(1);
let (addr_sender, addr_receiver) = channel::oneshot::channel::<Multiaddr>();
let collected_clone = collected.clone();
thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut service = create_service(collected_clone, sender);
rt.block_on(async move {
let listen_addr = service
.listen("/ip4/127.0.0.1/tcp/0/ws".parse().unwrap())
.await
.unwrap();
addr_sender.send(listen_addr).unwrap();
service.run().await
});
});
let listen_addr = futures::executor::block_on(addr_receiver).unwrap();
let socket_addr: SocketAddr = {
use tentacle::multiaddr::Protocol;
let mut ip = None;
let mut port = None;
for proto in listen_addr.iter() {
match proto {
Protocol::Ip4(i) => ip = Some(IpAddr::V4(i)),
Protocol::Tcp(p) => port = Some(p),
_ => {}
}
}
SocketAddr::new(ip.unwrap(), port.unwrap())
};
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
use tokio_tungstenite::connect_async;
let ws_url = format!("ws://127.0.0.1:{}/", socket_addr.port());
connect_async(&ws_url).await.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
});
receiver.recv_timeout(Duration::from_secs(5)).unwrap();
thread::sleep(Duration::from_millis(100));
let addresses = collected.get_all();
assert!(
!addresses.is_empty(),
"Should have collected at least one address"
);
let first_addr = &addresses[0];
let ip = extract_ip_from_multiaddr(first_addr);
assert!(ip.is_some(), "Should be able to extract IP from address");
assert_eq!(
ip.unwrap().to_string(),
"127.0.0.1",
"Should use the socket address when no X-Forwarded-For is present"
);
}
#[cfg(feature = "ws")]
#[test]
fn test_ws_x_forwarded_for_multiple_ips() {
let collected = CollectedAddresses::default();
let (sender, receiver) = crossbeam_channel::bounded(1);
let (addr_sender, addr_receiver) = channel::oneshot::channel::<Multiaddr>();
let collected_clone = collected.clone();
thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut service = create_service(collected_clone, sender);
rt.block_on(async move {
let listen_addr = service
.listen("/ip4/127.0.0.1/tcp/0/ws".parse().unwrap())
.await
.unwrap();
addr_sender.send(listen_addr).unwrap();
service.run().await
});
});
let listen_addr = futures::executor::block_on(addr_receiver).unwrap();
let socket_addr: SocketAddr = {
use tentacle::multiaddr::Protocol;
let mut ip = None;
let mut port = None;
for proto in listen_addr.iter() {
match proto {
Protocol::Ip4(i) => ip = Some(IpAddr::V4(i)),
Protocol::Tcp(p) => port = Some(p),
_ => {}
}
}
SocketAddr::new(ip.unwrap(), port.unwrap())
};
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let mut stream = TcpStream::connect(socket_addr).await.unwrap();
let ws_request = build_ws_upgrade_request_with_forwarded_for(
&format!("127.0.0.1:{}", socket_addr.port()),
"203.0.113.195, 70.41.3.18, 150.172.238.178",
);
stream.write_all(ws_request.as_bytes()).await.unwrap();
let mut response = vec![0u8; 1024];
stream.read_buf(&mut response).await.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
});
receiver.recv_timeout(Duration::from_secs(5)).unwrap();
thread::sleep(Duration::from_millis(100));
let addresses = collected.get_all();
assert!(
!addresses.is_empty(),
"Should have collected at least one address"
);
let first_addr = &addresses[0];
let ip = extract_ip_from_multiaddr(first_addr);
assert!(ip.is_some(), "Should be able to extract IP from address");
assert_eq!(
ip.unwrap().to_string(),
"203.0.113.195",
"Should use the first IP from X-Forwarded-For header chain"
);
}
#[cfg(feature = "ws")]
#[test]
fn test_ws_x_forwarded_for_ipv6() {
let collected = CollectedAddresses::default();
let (sender, receiver) = crossbeam_channel::bounded(1);
let (addr_sender, addr_receiver) = channel::oneshot::channel::<Multiaddr>();
let collected_clone = collected.clone();
thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut service = create_service(collected_clone, sender);
rt.block_on(async move {
let listen_addr = service
.listen("/ip4/127.0.0.1/tcp/0/ws".parse().unwrap())
.await
.unwrap();
addr_sender.send(listen_addr).unwrap();
service.run().await
});
});
let listen_addr = futures::executor::block_on(addr_receiver).unwrap();
let socket_addr: SocketAddr = {
use tentacle::multiaddr::Protocol;
let mut ip = None;
let mut port = None;
for proto in listen_addr.iter() {
match proto {
Protocol::Ip4(i) => ip = Some(IpAddr::V4(i)),
Protocol::Tcp(p) => port = Some(p),
_ => {}
}
}
SocketAddr::new(ip.unwrap(), port.unwrap())
};
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let mut stream = TcpStream::connect(socket_addr).await.unwrap();
let ws_request = build_ws_upgrade_request_with_forwarded_for(
&format!("127.0.0.1:{}", socket_addr.port()),
"2001:db8:cafe::17",
);
stream.write_all(ws_request.as_bytes()).await.unwrap();
let mut response = vec![0u8; 1024];
stream.read_buf(&mut response).await.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
});
receiver.recv_timeout(Duration::from_secs(5)).unwrap();
thread::sleep(Duration::from_millis(100));
let addresses = collected.get_all();
assert!(
!addresses.is_empty(),
"Should have collected at least one address"
);
let first_addr = &addresses[0];
let ip = extract_ip_from_multiaddr(first_addr);
assert!(ip.is_some(), "Should be able to extract IP from address");
assert_eq!(
ip.unwrap().to_string(),
"2001:db8:cafe::17",
"Should use the IPv6 from X-Forwarded-For header"
);
}
#[cfg(feature = "ws")]
fn extract_port_from_multiaddr(addr: &Multiaddr) -> Option<u16> {
use tentacle::multiaddr::Protocol;
for proto in addr.iter() {
if let Protocol::Tcp(port) = proto {
return Some(port);
}
}
None
}
#[cfg(feature = "ws")]
#[test]
fn test_ws_x_forwarded_for_with_port() {
let collected = CollectedAddresses::default();
let (sender, receiver) = crossbeam_channel::bounded(1);
let (addr_sender, addr_receiver) = channel::oneshot::channel::<Multiaddr>();
let collected_clone = collected.clone();
thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut service = create_service(collected_clone, sender);
rt.block_on(async move {
let listen_addr = service
.listen("/ip4/127.0.0.1/tcp/0/ws".parse().unwrap())
.await
.unwrap();
addr_sender.send(listen_addr).unwrap();
service.run().await
});
});
let listen_addr = futures::executor::block_on(addr_receiver).unwrap();
let socket_addr: SocketAddr = {
use tentacle::multiaddr::Protocol;
let mut ip = None;
let mut port = None;
for proto in listen_addr.iter() {
match proto {
Protocol::Ip4(i) => ip = Some(IpAddr::V4(i)),
Protocol::Tcp(p) => port = Some(p),
_ => {}
}
}
SocketAddr::new(ip.unwrap(), port.unwrap())
};
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let mut stream = TcpStream::connect(socket_addr).await.unwrap();
let ws_request = build_ws_upgrade_request_with_forwarded_for_and_port(
&format!("127.0.0.1:{}", socket_addr.port()),
"198.51.100.50",
54321,
);
stream.write_all(ws_request.as_bytes()).await.unwrap();
let mut response = vec![0u8; 1024];
stream.read_buf(&mut response).await.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
});
receiver.recv_timeout(Duration::from_secs(5)).unwrap();
thread::sleep(Duration::from_millis(100));
let addresses = collected.get_all();
assert!(
!addresses.is_empty(),
"Should have collected at least one address"
);
let first_addr = &addresses[0];
let ip = extract_ip_from_multiaddr(first_addr);
let port = extract_port_from_multiaddr(first_addr);
assert!(ip.is_some(), "Should be able to extract IP from address");
assert!(
port.is_some(),
"Should be able to extract port from address"
);
assert_eq!(
ip.unwrap().to_string(),
"198.51.100.50",
"Should use the IP from X-Forwarded-For header"
);
assert_eq!(
port.unwrap(),
54321,
"Should use the port from X-Forwarded-Port header"
);
}