use bytes::Bytes;
use http::{Request, Uri};
use specter::fingerprint::http2::Http2Settings;
use specter::fingerprint::tls::TlsFingerprint;
use specter::transport::connector::BoringConnector;
use specter::transport::h2::{H2Connection, PseudoHeaderOrder};
use std::time::Duration;
use tokio::time::timeout;
#[tokio::test]
async fn test_real_streaming_nghttp2() {
let fp = TlsFingerprint::chrome_142();
let connector = BoringConnector::with_fingerprint(fp);
let settings = Http2Settings::default();
let uri: Uri = "https://nghttp2.org/httpbin/stream-bytes/4096"
.parse()
.unwrap();
let stream = connector
.connect(&uri)
.await
.expect("TLS connection should succeed");
if !stream.is_h2() {
panic!("Server did not negotiate HTTP/2 - cannot test streaming");
}
let mut h2_conn = H2Connection::connect(stream, settings, PseudoHeaderOrder::Chrome)
.await
.expect("HTTP/2 connection should succeed");
let request = Request::builder()
.method("GET")
.uri(&uri)
.header(
"user-agent",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
)
.header("accept", "*/*")
.body(Bytes::new())
.expect("Failed to build request");
let (response, mut rx) = h2_conn
.send_request_streaming(request)
.await
.expect("Streaming request should succeed");
assert_eq!(response.status(), 200, "Should get 200 OK");
let reader_handle = tokio::spawn(async move {
let mut bytes_received = 0;
let mut chunks = 0;
while let Some(result) = rx.recv().await {
match result {
Ok(chunk) => {
bytes_received += chunk.len();
chunks += 1;
}
Err(e) => {
panic!("Stream error: {}", e);
}
}
}
(bytes_received, chunks)
});
let driver_handle = tokio::spawn(async move {
loop {
match h2_conn.read_streaming_frames().await {
Ok(true) => continue,
Ok(false) => break, Err(e) => {
tracing::debug!("Stream reader ended: {}", e);
break;
}
}
}
});
let (total_bytes, chunk_count) = timeout(Duration::from_secs(30), async {
let _ = driver_handle.await;
reader_handle.await.expect("Reader task should complete")
})
.await
.expect("Streaming should complete within 30 seconds");
assert!(total_bytes > 0, "Should have received some bytes");
assert!(
chunk_count >= 1,
"Should have received at least 1 chunk (got {} chunks, {} bytes)",
chunk_count,
total_bytes
);
tracing::info!(
"Streaming test passed: received {} bytes in {} chunks",
total_bytes,
chunk_count
);
}
#[tokio::test]
async fn test_real_streaming_cloudflare_trace() {
let fp = TlsFingerprint::chrome_142();
let connector = BoringConnector::with_fingerprint(fp);
let settings = Http2Settings::default();
let uri: Uri = "https://cloudflare.com/cdn-cgi/trace".parse().unwrap();
let stream = connector
.connect(&uri)
.await
.expect("TLS connection should succeed");
if !stream.is_h2() {
tracing::info!("Cloudflare did not negotiate HTTP/2, skipping streaming test");
return;
}
let mut h2_conn = H2Connection::connect(stream, settings, PseudoHeaderOrder::Chrome)
.await
.expect("HTTP/2 connection should succeed");
let request = Request::builder()
.method("GET")
.uri(&uri)
.header(
"user-agent",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
)
.header("accept", "*/*")
.body(Bytes::new())
.expect("Failed to build request");
let (response, mut rx) = h2_conn
.send_request_streaming(request)
.await
.expect("Streaming request should succeed");
assert_eq!(response.status(), 200, "Should get 200 OK");
let reader_handle = tokio::spawn(async move {
let mut body = Vec::new();
while let Some(result) = rx.recv().await {
match result {
Ok(chunk) => body.extend_from_slice(&chunk),
Err(e) => panic!("Stream error: {}", e),
}
}
body
});
let driver_handle = tokio::spawn(async move {
loop {
match h2_conn.read_streaming_frames().await {
Ok(true) => continue,
Ok(false) => break,
Err(_) => break,
}
}
});
let body = timeout(Duration::from_secs(10), async {
let _ = driver_handle.await;
reader_handle.await.expect("Reader should complete")
})
.await
.expect("Should complete within 10 seconds");
let body_str = String::from_utf8_lossy(&body);
assert!(
body_str.contains("fl=") || body_str.contains("h="),
"Cloudflare trace should contain connection info"
);
tracing::info!("Cloudflare streaming test passed: {} bytes", body.len());
}
#[tokio::test]
async fn test_streaming_larger_response() {
let fp = TlsFingerprint::chrome_142();
let connector = BoringConnector::with_fingerprint(fp);
let settings = Http2Settings::default();
let uri: Uri = "https://nghttp2.org/httpbin/stream-bytes/65536"
.parse()
.unwrap();
let stream = connector
.connect(&uri)
.await
.expect("TLS connection should succeed");
if !stream.is_h2() {
panic!("Server did not negotiate HTTP/2");
}
let mut h2_conn = H2Connection::connect(stream, settings, PseudoHeaderOrder::Chrome)
.await
.expect("HTTP/2 connection should succeed");
let request = Request::builder()
.method("GET")
.uri(&uri)
.header(
"user-agent",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
)
.header("accept", "*/*")
.body(Bytes::new())
.expect("Failed to build request");
let (response, mut rx) = h2_conn
.send_request_streaming(request)
.await
.expect("Streaming request should succeed");
assert_eq!(response.status(), 200);
let reader_handle = tokio::spawn(async move {
let mut chunk_sizes: Vec<usize> = Vec::new();
while let Some(result) = rx.recv().await {
match result {
Ok(chunk) => chunk_sizes.push(chunk.len()),
Err(e) => panic!("Stream error: {}", e),
}
}
chunk_sizes
});
let driver_handle = tokio::spawn(async move {
loop {
match h2_conn.read_streaming_frames().await {
Ok(true) => continue,
Ok(false) => break,
Err(_) => break,
}
}
});
let chunk_sizes = timeout(Duration::from_secs(60), async {
let _ = driver_handle.await;
reader_handle.await.expect("Reader should complete")
})
.await
.expect("Should complete within 60 seconds");
let total_bytes: usize = chunk_sizes.iter().sum();
let chunk_count = chunk_sizes.len();
assert!(
chunk_count >= 2,
"64KB response should arrive in multiple chunks (got {} chunks)",
chunk_count
);
assert!(
total_bytes >= 60000,
"Should receive at least 60KB (got {} bytes)",
total_bytes
);
tracing::info!(
"Large streaming test passed: {} bytes in {} chunks",
total_bytes,
chunk_count
);
tracing::debug!("Chunk sizes: {:?}", chunk_sizes);
}
#[tokio::test]
async fn test_streaming_headers_available_immediately() {
let fp = TlsFingerprint::chrome_142();
let connector = BoringConnector::with_fingerprint(fp);
let settings = Http2Settings::default();
let uri: Uri = "https://nghttp2.org/httpbin/headers".parse().unwrap();
let stream = connector
.connect(&uri)
.await
.expect("TLS connection should succeed");
if !stream.is_h2() {
panic!("Server did not negotiate HTTP/2");
}
let mut h2_conn = H2Connection::connect(stream, settings, PseudoHeaderOrder::Chrome)
.await
.expect("HTTP/2 connection should succeed");
let request = Request::builder()
.method("GET")
.uri(&uri)
.header("user-agent", "specter/test")
.header("accept", "application/json")
.header("x-custom-header", "test-value")
.body(Bytes::new())
.expect("Failed to build request");
let (response, mut rx) = h2_conn
.send_request_streaming(request)
.await
.expect("Streaming request should succeed");
assert_eq!(response.status(), 200, "Should get 200 OK");
let content_type = response
.headers()
.get("content-type")
.map(|v| v.to_str().unwrap_or(""));
assert!(
content_type.is_some(),
"Content-Type header should be present"
);
let reader_handle = tokio::spawn(async move {
let mut body = Vec::new();
while let Some(result) = rx.recv().await {
if let Ok(chunk) = result {
body.extend_from_slice(&chunk);
}
}
body
});
let driver_handle = tokio::spawn(async move {
loop {
match h2_conn.read_streaming_frames().await {
Ok(true) => continue,
Ok(false) => break,
Err(_) => break,
}
}
});
let body = timeout(Duration::from_secs(10), async {
let _ = driver_handle.await;
reader_handle.await.expect("Reader should complete")
})
.await
.expect("Should complete within 10 seconds");
let body_str = String::from_utf8_lossy(&body);
let json: serde_json::Value = serde_json::from_str(&body_str).expect("Should be valid JSON");
if let Some(headers) = json.get("headers") {
assert!(
headers.get("X-Custom-Header").is_some() || headers.get("x-custom-header").is_some(),
"Custom header should be echoed back"
);
}
tracing::info!("Headers streaming test passed");
}