use ntrip_core::{NtripClient, NtripConfig};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
#[tokio::test]
async fn test_header_overread_preserved() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let server = tokio::spawn(async move {
let (mut socket, _) = listener.accept().await.unwrap();
let mut buf = [0u8; 1024];
let _ = socket.read(&mut buf).await.unwrap();
let response = b"HTTP/1.1 200 OK\r\nContent-Type: application/octet-stream\r\n\r\n\xD3\x00\x13RTCM_DATA_HERE";
socket.write_all(response).await.unwrap();
let _ = socket.flush().await;
tokio::time::sleep(Duration::from_secs(2)).await;
});
let config = NtripConfig::new(addr.ip().to_string(), addr.port(), "TEST")
.with_timeout(5)
.with_read_timeout(5)
.without_reconnect();
let mut client = NtripClient::new(config).unwrap();
client.connect().await.unwrap();
let mut buf = [0u8; 256];
let n = tokio::time::timeout(Duration::from_secs(1), client.read_chunk(&mut buf))
.await
.expect("read timeout")
.expect("read error");
assert!(n > 0, "Should receive over-read bytes");
assert_eq!(buf[0], 0xD3, "First byte should be RTCM preamble");
server.abort();
}
#[tokio::test]
async fn test_chunked_encoding() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let server = tokio::spawn(async move {
let (mut socket, _) = listener.accept().await.unwrap();
let mut buf = [0u8; 1024];
let _ = socket.read(&mut buf).await.unwrap();
let response = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\nNtrip-Version: Ntrip/2.0\r\n\r\n5\r\nHELLO\r\n5\r\nWORLD\r\n";
socket.write_all(response).await.unwrap();
let _ = socket.flush().await;
tokio::time::sleep(Duration::from_secs(2)).await;
});
let config = NtripConfig::new(addr.ip().to_string(), addr.port(), "TEST")
.with_timeout(5)
.with_read_timeout(5)
.without_reconnect();
let mut client = NtripClient::new(config).unwrap();
client.connect().await.unwrap();
let mut buf = [0u8; 256];
let n = tokio::time::timeout(Duration::from_secs(1), client.read_chunk(&mut buf))
.await
.expect("read timeout")
.expect("read error");
assert_eq!(n, 5, "First chunk should be 5 bytes");
assert_eq!(&buf[..5], b"HELLO");
let n = tokio::time::timeout(Duration::from_secs(1), client.read_chunk(&mut buf))
.await
.expect("read timeout")
.expect("read error");
assert_eq!(n, 5, "Second chunk should be 5 bytes");
assert_eq!(&buf[..5], b"WORLD");
server.abort();
}
#[tokio::test]
async fn test_chunk_extension_tolerance() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let server = tokio::spawn(async move {
let (mut socket, _) = listener.accept().await.unwrap();
let mut buf = [0u8; 1024];
let _ = socket.read(&mut buf).await.unwrap();
let response =
b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\na;name=value\r\n0123456789\r\n";
socket.write_all(response).await.unwrap();
let _ = socket.flush().await;
tokio::time::sleep(Duration::from_secs(2)).await;
});
let config = NtripConfig::new(addr.ip().to_string(), addr.port(), "TEST")
.with_timeout(5)
.with_read_timeout(5)
.without_reconnect();
let mut client = NtripClient::new(config).unwrap();
client.connect().await.unwrap();
let mut buf = [0u8; 256];
let n = tokio::time::timeout(Duration::from_secs(1), client.read_chunk(&mut buf))
.await
.expect("read timeout")
.expect("read error");
assert_eq!(n, 10, "Should parse chunk size ignoring extension");
assert_eq!(&buf[..10], b"0123456789");
server.abort();
}
#[tokio::test]
async fn test_status_code_parsing() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let server = tokio::spawn(async move {
let (mut socket, _) = listener.accept().await.unwrap();
let mut buf = [0u8; 1024];
let _ = socket.read(&mut buf).await.unwrap();
let response = b"HTTP/1.1 401 Unauthorized\r\nWWW-Authenticate: Basic\r\n\r\n";
socket.write_all(response).await.unwrap();
});
let config = NtripConfig::new(addr.ip().to_string(), addr.port(), "TEST")
.with_timeout(5)
.without_reconnect();
let mut client = NtripClient::new(config).unwrap();
let result = client.connect().await;
assert!(result.is_err(), "Should fail on 401");
let err = result.unwrap_err();
assert!(
format!("{}", err).contains("Authentication"),
"Error should mention authentication"
);
server.await.unwrap();
}
#[tokio::test]
async fn test_icy_response() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let server = tokio::spawn(async move {
let (mut socket, _) = listener.accept().await.unwrap();
let mut buf = [0u8; 1024];
let _ = socket.read(&mut buf).await.unwrap();
socket
.write_all(b"ICY 200 OK\r\n\xD3\x00\x10")
.await
.unwrap();
let _ = socket.flush().await;
tokio::time::sleep(Duration::from_secs(2)).await;
});
let config = NtripConfig::new(addr.ip().to_string(), addr.port(), "TEST")
.with_timeout(5)
.with_read_timeout(5)
.without_reconnect();
let mut client = NtripClient::new(config).unwrap();
client.connect().await.unwrap();
let mut buf = [0u8; 256];
let n = tokio::time::timeout(Duration::from_secs(1), client.read_chunk(&mut buf))
.await
.expect("read timeout")
.expect("read error");
assert!(n >= 3, "Should receive over-read bytes after ICY");
assert_eq!(buf[0], 0xD3, "First byte should be RTCM preamble");
server.abort();
}
#[tokio::test]
async fn test_zero_read_timeout_no_immediate_error() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let server = tokio::spawn(async move {
let (mut socket, _) = listener.accept().await.unwrap();
let mut buf = [0u8; 1024];
let _ = socket.read(&mut buf).await.unwrap();
socket.write_all(b"HTTP/1.1 200 OK\r\n\r\n").await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
socket.write_all(b"DATA").await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
});
let config = NtripConfig::new(addr.ip().to_string(), addr.port(), "TEST")
.with_timeout(5)
.with_read_timeout(0) .without_reconnect();
let mut client = NtripClient::new(config).unwrap();
client.connect().await.unwrap();
let mut buf = [0u8; 256];
let result = tokio::time::timeout(Duration::from_secs(1), client.read_chunk(&mut buf)).await;
assert!(
result.is_ok(),
"Should not immediately timeout with read_timeout_secs=0"
);
let n = result.unwrap().unwrap();
assert_eq!(n, 4);
assert_eq!(&buf[..4], b"DATA");
server.await.unwrap();
}
#[test]
fn test_header_injection_validation() {
let config = NtripConfig::new("example.com", 2101, "TEST\r\nX-Injected: value");
assert!(
config.validate().is_err(),
"Mountpoint with CRLF should fail validation"
);
let config =
NtripConfig::new("example.com", 2101, "TEST").with_credentials("user\nname", "pass");
assert!(
config.validate().is_err(),
"Username with newline should fail validation"
);
let config = NtripConfig::new("example\r.com", 2101, "TEST");
assert!(
config.validate().is_err(),
"Host with CR should fail validation"
);
let config =
NtripConfig::new("example.com", 2101, "TEST").with_credentials("user@test.com", "pass123");
assert!(config.validate().is_ok(), "Valid config should pass");
}
#[test]
fn test_password_redacted_in_debug() {
let config =
NtripConfig::new("example.com", 2101, "TEST").with_credentials("user", "secret_password");
let debug_output = format!("{:?}", config);
assert!(
!debug_output.contains("secret_password"),
"Debug output should not contain password"
);
assert!(
debug_output.contains("[REDACTED]"),
"Debug output should show [REDACTED]"
);
assert!(
debug_output.contains("user"),
"Debug output should still show username"
);
}