mod common;
use common::test_server::{RequestRule, ServerConfig, TestServer};
use futures_util::StreamExt;
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use ripcurl::protocol::http::HttpSourceProtocol;
use ripcurl::protocol::{SourceProtocol, SourceReader, TransferError};
use std::pin::pin;
use std::time::Duration;
async fn collect_reader(reader: impl SourceReader) -> Result<Vec<u8>, TransferError> {
let mut bytes = Vec::new();
let mut stream = pin!(reader.stream_bytes());
while let Some(result) = stream.next().await {
bytes.extend_from_slice(&result?);
}
Ok(bytes)
}
#[tokio::test]
async fn test_basic_download() {
let content_size = 10_000;
let server = TestServer::start(ServerConfig::new(
content_size,
vec![RequestRule::Serve {
support_ranges: true,
}],
))
.await;
let mut source = HttpSourceProtocol::new(HeaderMap::default()).unwrap();
let (reader, offset) = source.get_reader(server.url("/file"), 0).await.unwrap();
assert_eq!(offset.offset, 0);
assert_eq!(offset.total_size, Some(content_size as u64));
let bytes = collect_reader(reader).await.unwrap();
assert_eq!(bytes.len(), content_size);
let expected = common::test_server::generate_content(content_size);
assert_eq!(bytes, expected);
}
#[tokio::test]
async fn test_range_request_resume() {
let content_size = 10_000;
let server = TestServer::start(ServerConfig::new(content_size, vec![]).with_fallback(
RequestRule::Serve {
support_ranges: true,
},
))
.await;
let mut source = HttpSourceProtocol::new(HeaderMap::default()).unwrap();
let (reader, offset) = source.get_reader(server.url("/file"), 0).await.unwrap();
assert_eq!(offset.offset, 0);
let first_bytes = collect_reader(reader).await.unwrap();
assert_eq!(first_bytes.len(), content_size);
let (reader, offset) = source.get_reader(server.url("/file"), 5000).await.unwrap();
assert_eq!(offset.offset, 5000);
let remaining_bytes = collect_reader(reader).await.unwrap();
assert_eq!(remaining_bytes.len(), 5000);
let expected = common::test_server::generate_content(content_size);
assert_eq!(remaining_bytes, &expected[5000..]);
let headers = server.last_request_headers("/file").unwrap();
assert!(headers.get("range").is_some());
}
#[tokio::test]
async fn test_server_no_range_support() {
let content_size = 1000;
let server = TestServer::start(
ServerConfig::new(content_size, vec![]).with_fallback(RequestRule::ServeIgnoringRange),
)
.await;
let mut source = HttpSourceProtocol::new(HeaderMap::default()).unwrap();
let (reader, offset) = source.get_reader(server.url("/file"), 0).await.unwrap();
assert_eq!(offset.offset, 0);
let _ = collect_reader(reader).await.unwrap();
let (reader, offset) = source.get_reader(server.url("/file"), 500).await.unwrap();
assert_eq!(offset.offset, 0);
let bytes = collect_reader(reader).await.unwrap();
assert_eq!(bytes.len(), content_size);
}
#[tokio::test]
async fn test_503_then_success() {
let content_size = 1000;
let server = TestServer::start(ServerConfig::new(
content_size,
vec![
RequestRule::Error {
status: 503,
retry_after: Some("0".to_string()),
},
RequestRule::Error {
status: 503,
retry_after: Some("0".to_string()),
},
RequestRule::Serve {
support_ranges: true,
},
],
))
.await;
let mut source = HttpSourceProtocol::new(HeaderMap::default()).unwrap();
let err = source.get_reader(server.url("/file"), 0).await.unwrap_err();
assert!(matches!(err, TransferError::Transient { .. }));
let err = source.get_reader(server.url("/file"), 0).await.unwrap_err();
assert!(matches!(err, TransferError::Transient { .. }));
let (reader, _) = source.get_reader(server.url("/file"), 0).await.unwrap();
let bytes = collect_reader(reader).await.unwrap();
assert_eq!(bytes.len(), content_size);
assert_eq!(server.request_count("/file"), 3);
}
#[tokio::test]
async fn test_429_with_retry_after() {
let server = TestServer::start(ServerConfig::new(
100,
vec![
RequestRule::Error {
status: 429,
retry_after: Some("1".to_string()),
},
RequestRule::Serve {
support_ranges: true,
},
],
))
.await;
let mut source = HttpSourceProtocol::new(HeaderMap::default()).unwrap();
let err = source.get_reader(server.url("/file"), 0).await.unwrap_err();
match err {
TransferError::Transient {
minimum_retry_delay,
..
} => {
assert_eq!(minimum_retry_delay, Duration::from_secs(1));
}
other @ TransferError::Permanent { .. } => {
panic!("expected Transient, got: {other:?}")
}
}
let (reader, _) = source.get_reader(server.url("/file"), 0).await.unwrap();
let bytes = collect_reader(reader).await.unwrap();
assert_eq!(bytes.len(), 100);
}
#[tokio::test]
async fn test_connection_drop_mid_stream() {
let content_size = 10_000;
let server = TestServer::start(ServerConfig::new(
content_size,
vec![RequestRule::PartialThenDrop {
bytes_before_drop: 5000,
}],
))
.await;
let mut source = HttpSourceProtocol::new(HeaderMap::default()).unwrap();
let result = source.get_reader(server.url("/file"), 0).await;
match result {
Ok((reader, _)) => {
let mut total = 0u64;
let mut stream = pin!(reader.stream_bytes());
let mut got_error = false;
while let Some(result) = stream.next().await {
if let Ok(bytes) = result {
total += bytes.len() as u64;
} else {
got_error = true;
break;
}
}
assert!(
got_error || total < content_size as u64,
"should either get a stream error or fewer bytes ({total}) than expected ({content_size})"
);
}
Err(err) => {
assert!(
matches!(
err,
TransferError::Transient { .. } | TransferError::Permanent { .. }
),
"unexpected error variant: {err:?}"
);
}
}
}
#[tokio::test]
async fn test_404_permanent() {
let server = TestServer::start(ServerConfig::new(
100,
vec![RequestRule::Error {
status: 404,
retry_after: None,
}],
))
.await;
let mut source = HttpSourceProtocol::new(HeaderMap::default()).unwrap();
let err = source.get_reader(server.url("/file"), 0).await.unwrap_err();
assert!(matches!(err, TransferError::Permanent { .. }));
}
#[tokio::test]
async fn test_etag_included_on_resume() {
let content_size = 1000;
let server = TestServer::start(
ServerConfig::new(content_size, vec![])
.with_fallback(RequestRule::Serve {
support_ranges: true,
})
.with_etag(Some("\"my-etag-123\"".to_string())),
)
.await;
let mut source = HttpSourceProtocol::new(HeaderMap::default()).unwrap();
let (reader, _) = source.get_reader(server.url("/file"), 0).await.unwrap();
let _ = collect_reader(reader).await.unwrap();
let (reader, _) = source.get_reader(server.url("/file"), 500).await.unwrap();
let _ = collect_reader(reader).await.unwrap();
let headers = server.last_request_headers("/file").unwrap();
let if_match = headers
.get("if-match")
.expect("If-Match header should be present on resume")
.to_str()
.unwrap();
assert_eq!(if_match, "\"my-etag-123\"");
}
#[tokio::test]
async fn test_redirect_chain() {
let content_size = 500;
let server = TestServer::start(
ServerConfig::new(content_size, vec![])
.with_path_rules(
"/start",
vec![RequestRule::Redirect {
status: 302,
location: "/middle".to_string(),
}],
)
.with_path_rules(
"/middle",
vec![RequestRule::Redirect {
status: 301,
location: "/final".to_string(),
}],
)
.with_path_rules(
"/final",
vec![RequestRule::Serve {
support_ranges: true,
}],
),
)
.await;
let mut source = HttpSourceProtocol::new(HeaderMap::default()).unwrap();
let (reader, offset) = source.get_reader(server.url("/start"), 0).await.unwrap();
assert_eq!(offset.offset, 0);
let bytes = collect_reader(reader).await.unwrap();
assert_eq!(bytes.len(), content_size);
let expected = common::test_server::generate_content(content_size);
assert_eq!(bytes, expected);
}
#[tokio::test]
async fn test_redirect_to_different_server() {
let content_size = 500;
let server2 = TestServer::start(ServerConfig::new(
content_size,
vec![RequestRule::Serve {
support_ranges: true,
}],
))
.await;
let server1 = TestServer::start(ServerConfig::new(0, vec![]).with_path_rules(
"/download",
vec![RequestRule::Redirect {
status: 302,
location: server2.url("/file").to_string(),
}],
))
.await;
let mut source = HttpSourceProtocol::new(HeaderMap::default()).unwrap();
let (reader, _) = source
.get_reader(server1.url("/download"), 0)
.await
.unwrap();
let bytes = collect_reader(reader).await.unwrap();
assert_eq!(bytes.len(), content_size);
let expected = common::test_server::generate_content(content_size);
assert_eq!(bytes, expected);
}
#[tokio::test]
async fn test_etag_change_after_range_failure_does_not_panic() {
let content_size = 1000;
let server = TestServer::start(
ServerConfig::new(
content_size,
vec![
RequestRule::WithHeaders {
etag: Some("\"etag-v1\"".to_string()),
last_modified: None,
then: Box::new(RequestRule::Serve {
support_ranges: true,
}),
},
RequestRule::WithHeaders {
etag: Some("\"etag-v1\"".to_string()),
last_modified: None,
then: Box::new(RequestRule::ServeIgnoringRange),
},
RequestRule::WithHeaders {
etag: Some("\"etag-v2\"".to_string()),
last_modified: None,
then: Box::new(RequestRule::Serve {
support_ranges: true,
}),
},
],
)
.with_etag(None)
.with_last_modified(None),
)
.await;
let mut source = HttpSourceProtocol::new(HeaderMap::default()).unwrap();
let (reader, offset) = source.get_reader(server.url("/file"), 0).await.unwrap();
assert_eq!(offset.offset, 0);
let _ = collect_reader(reader).await.unwrap();
let (reader, offset) = source.get_reader(server.url("/file"), 500).await.unwrap();
assert_eq!(offset.offset, 0);
let _ = collect_reader(reader).await.unwrap();
let (reader, offset) = source.get_reader(server.url("/file"), 0).await.unwrap();
assert_eq!(offset.offset, 0);
let bytes = collect_reader(reader).await.unwrap();
assert_eq!(bytes.len(), content_size);
}
#[tokio::test]
async fn test_206_reports_total_size_from_content_range() {
let content_size = 10_000;
let server = TestServer::start(ServerConfig::new(content_size, vec![]).with_fallback(
RequestRule::Serve {
support_ranges: true,
},
))
.await;
let mut source = HttpSourceProtocol::new(HeaderMap::default()).unwrap();
let (reader, _) = source.get_reader(server.url("/file"), 0).await.unwrap();
let _ = collect_reader(reader).await.unwrap();
let (_reader, offset) = source.get_reader(server.url("/file"), 5000).await.unwrap();
assert_eq!(offset.offset, 5000);
assert_eq!(
offset.total_size,
Some(content_size as u64),
"total_size should reflect the full resource size from Content-Range, not the partial Content-Length"
);
}
#[tokio::test]
async fn test_disappearing_etag_detected_on_resume() {
let content_size = 1000;
let server = TestServer::start(
ServerConfig::new(
content_size,
vec![
RequestRule::WithHeaders {
etag: Some("\"etag-v1\"".to_string()),
last_modified: None,
then: Box::new(RequestRule::Serve {
support_ranges: true,
}),
},
RequestRule::WithHeaders {
etag: None,
last_modified: None,
then: Box::new(RequestRule::Serve {
support_ranges: true,
}),
},
RequestRule::WithHeaders {
etag: Some("\"etag-v2\"".to_string()),
last_modified: None,
then: Box::new(RequestRule::Serve {
support_ranges: true,
}),
},
],
)
.with_etag(None)
.with_last_modified(None),
)
.await;
let mut source = HttpSourceProtocol::new(HeaderMap::default()).unwrap();
let (reader, _) = source.get_reader(server.url("/file"), 0).await.unwrap();
let _ = collect_reader(reader).await.unwrap();
let (_reader, offset) = source.get_reader(server.url("/file"), 500).await.unwrap();
assert_eq!(
offset.offset, 0,
"should restart from 0 after detecting disappearing ETag"
);
assert_eq!(
server.request_count("/file"),
3,
"expected: initial + failed resume + restart"
);
}
#[tokio::test]
async fn test_custom_headers_sent() {
let content_size = 500;
let server = TestServer::start(ServerConfig::new(
content_size,
vec![RequestRule::Serve {
support_ranges: true,
}],
))
.await;
let mut headers = HeaderMap::new();
headers.insert(
HeaderName::from_static("x-test"),
HeaderValue::from_static("hello"),
);
let mut source = HttpSourceProtocol::new(headers).unwrap();
let (reader, _) = source.get_reader(server.url("/file"), 0).await.unwrap();
let _ = collect_reader(reader).await.unwrap();
let req_headers = server.last_request_headers("/file").unwrap();
assert_eq!(
req_headers.get("x-test").unwrap().to_str().unwrap(),
"hello"
);
}
#[tokio::test]
async fn test_custom_headers_sent_on_resume() {
let content_size = 1000;
let server = TestServer::start(ServerConfig::new(content_size, vec![]).with_fallback(
RequestRule::Serve {
support_ranges: true,
},
))
.await;
let mut headers = HeaderMap::new();
headers.insert(
HeaderName::from_static("x-custom"),
HeaderValue::from_static("persist"),
);
let mut source = HttpSourceProtocol::new(headers).unwrap();
let (reader, _) = source.get_reader(server.url("/file"), 0).await.unwrap();
let _ = collect_reader(reader).await.unwrap();
let (reader, offset) = source.get_reader(server.url("/file"), 500).await.unwrap();
assert_eq!(offset.offset, 500);
let _ = collect_reader(reader).await.unwrap();
let req_headers = server.last_request_headers("/file").unwrap();
assert_eq!(
req_headers.get("x-custom").unwrap().to_str().unwrap(),
"persist"
);
}
#[tokio::test]
async fn test_auth_header_stripped_on_cross_host_redirect() {
let content_size = 500;
let server2 = TestServer::start(ServerConfig::new(
content_size,
vec![RequestRule::Serve {
support_ranges: true,
}],
))
.await;
let server1 = TestServer::start(ServerConfig::new(0, vec![]).with_path_rules(
"/download",
vec![RequestRule::Redirect {
status: 302,
location: server2.url("/file").to_string(),
}],
))
.await;
let mut headers = HeaderMap::new();
headers.insert(
reqwest::header::AUTHORIZATION,
HeaderValue::from_static("Bearer secret-token"),
);
let mut source = HttpSourceProtocol::new(headers).unwrap();
let (reader, _) = source
.get_reader(server1.url("/download"), 0)
.await
.unwrap();
let bytes = collect_reader(reader).await.unwrap();
assert_eq!(bytes.len(), content_size);
let s1_headers = server1.last_request_headers("/download").unwrap();
assert!(
s1_headers.get("authorization").is_some(),
"server1 should receive the Authorization header"
);
let s2_headers = server2.last_request_headers("/file").unwrap();
assert!(
s2_headers.get("authorization").is_none(),
"server2 should NOT receive the Authorization header after cross-origin redirect"
);
}
#[tokio::test]
async fn test_auth_header_preserved_on_same_host_redirect() {
let content_size = 500;
let server = TestServer::start(
ServerConfig::new(content_size, vec![])
.with_path_rules(
"/start",
vec![RequestRule::Redirect {
status: 302,
location: "/final".to_string(),
}],
)
.with_path_rules(
"/final",
vec![RequestRule::Serve {
support_ranges: true,
}],
),
)
.await;
let mut headers = HeaderMap::new();
headers.insert(
reqwest::header::AUTHORIZATION,
HeaderValue::from_static("Bearer keep-me"),
);
let mut source = HttpSourceProtocol::new(headers).unwrap();
let (reader, _) = source.get_reader(server.url("/start"), 0).await.unwrap();
let bytes = collect_reader(reader).await.unwrap();
assert_eq!(bytes.len(), content_size);
let final_headers = server.last_request_headers("/final").unwrap();
assert_eq!(
final_headers
.get("authorization")
.expect("Authorization header should be preserved on same-host redirect")
.to_str()
.unwrap(),
"Bearer keep-me"
);
}
#[tokio::test]
async fn test_resume_headers_survive_cross_server_redirect() {
let content_size = 1000;
let server2 = TestServer::start(
ServerConfig::new(content_size, vec![])
.with_fallback(RequestRule::Serve {
support_ranges: true,
})
.with_etag(Some("\"cdn-etag\"".to_string()))
.with_last_modified(Some("Sun, 01 Jan 2025 00:00:00 GMT".to_string())),
)
.await;
let server1 = TestServer::start(ServerConfig::new(0, vec![]).with_fallback(
RequestRule::Redirect {
status: 302,
location: server2.url("/file").to_string(),
},
))
.await;
let mut headers = HeaderMap::new();
headers.insert(
reqwest::header::AUTHORIZATION,
HeaderValue::from_static("Bearer secret-token"),
);
let mut source = HttpSourceProtocol::new(headers).unwrap();
let (reader, offset) = source
.get_reader(server1.url("/download"), 0)
.await
.unwrap();
assert_eq!(offset.offset, 0);
let _ = collect_reader(reader).await.unwrap();
let (reader, offset) = source
.get_reader(server1.url("/download"), 500)
.await
.unwrap();
assert_eq!(offset.offset, 500);
let remaining = collect_reader(reader).await.unwrap();
assert_eq!(remaining.len(), 500);
let expected = common::test_server::generate_content(content_size);
assert_eq!(remaining, &expected[500..]);
let s2_headers = server2.last_request_headers("/file").unwrap();
assert!(
s2_headers.get("range").is_some(),
"Range header should survive cross-origin redirect"
);
assert_eq!(
s2_headers.get("if-match").unwrap().to_str().unwrap(),
"\"cdn-etag\"",
"If-Match should survive cross-origin redirect"
);
assert_eq!(
s2_headers
.get("if-unmodified-since")
.unwrap()
.to_str()
.unwrap(),
"Sun, 01 Jan 2025 00:00:00 GMT",
"If-Unmodified-Since should survive cross-origin redirect"
);
assert!(
s2_headers.get("authorization").is_none(),
"Authorization must NOT leak to CDN after cross-origin redirect"
);
let s1_headers = server1.last_request_headers("/download").unwrap();
assert!(
s1_headers.get("authorization").is_some(),
"Origin server should receive Authorization header"
);
}
#[tokio::test]
async fn test_etag_continuity_through_redirect_happy_path() {
let content_size = 1000;
let server = TestServer::start(
ServerConfig::new(content_size, vec![])
.with_path_rules(
"/download",
vec![
RequestRule::Redirect {
status: 302,
location: "/file".to_string(),
},
RequestRule::Redirect {
status: 302,
location: "/file".to_string(),
},
],
)
.with_path_rules(
"/file",
vec![
RequestRule::WithHeaders {
etag: Some("\"v1\"".to_string()),
last_modified: None,
then: Box::new(RequestRule::Serve {
support_ranges: true,
}),
},
RequestRule::WithHeaders {
etag: Some("\"v1\"".to_string()),
last_modified: None,
then: Box::new(RequestRule::Serve {
support_ranges: true,
}),
},
],
)
.with_etag(None)
.with_last_modified(None),
)
.await;
let mut source = HttpSourceProtocol::new(HeaderMap::default()).unwrap();
let (reader, _) = source.get_reader(server.url("/download"), 0).await.unwrap();
let _ = collect_reader(reader).await.unwrap();
let (reader, offset) = source
.get_reader(server.url("/download"), 500)
.await
.unwrap();
assert_eq!(
offset.offset, 500,
"resume should succeed when ETag is consistent through redirects"
);
let remaining = collect_reader(reader).await.unwrap();
assert_eq!(remaining.len(), 500);
let expected = common::test_server::generate_content(content_size);
assert_eq!(remaining, &expected[500..]);
assert_eq!(server.request_count("/download"), 2);
assert_eq!(server.request_count("/file"), 2);
}
#[tokio::test]
async fn test_cross_server_redirect_etag_mismatch_triggers_restart() {
let content_size = 1000;
let server2 = TestServer::start(
ServerConfig::new(content_size, vec![])
.with_path_rules(
"/file",
vec![
RequestRule::WithHeaders {
etag: Some("\"v1\"".to_string()),
last_modified: None,
then: Box::new(RequestRule::Serve {
support_ranges: true,
}),
},
RequestRule::WithHeaders {
etag: Some("\"v2\"".to_string()),
last_modified: None,
then: Box::new(RequestRule::Serve {
support_ranges: true,
}),
},
RequestRule::WithHeaders {
etag: Some("\"v2\"".to_string()),
last_modified: None,
then: Box::new(RequestRule::Serve {
support_ranges: true,
}),
},
],
)
.with_etag(None)
.with_last_modified(None),
)
.await;
let server1 = TestServer::start(ServerConfig::new(0, vec![]).with_fallback(
RequestRule::Redirect {
status: 302,
location: server2.url("/file").to_string(),
},
))
.await;
let mut source = HttpSourceProtocol::new(HeaderMap::default()).unwrap();
let (reader, _) = source
.get_reader(server1.url("/download"), 0)
.await
.unwrap();
let _ = collect_reader(reader).await.unwrap();
let (_reader, offset) = source
.get_reader(server1.url("/download"), 500)
.await
.unwrap();
assert_eq!(
offset.offset, 0,
"should restart from 0 when ETag changes through redirect"
);
assert_eq!(
server2.request_count("/file"),
3,
"expected: initial + mismatched resume + restart"
);
assert_eq!(
server1.request_count("/download"),
3,
"restart should go through the original URL (redirect chain)"
);
}
#[tokio::test]
async fn test_412_through_redirect_triggers_restart() {
let content_size = 1000;
let server2 = TestServer::start(
ServerConfig::new(content_size, vec![])
.with_path_rules(
"/file",
vec![
RequestRule::WithHeaders {
etag: Some("\"v1\"".to_string()),
last_modified: None,
then: Box::new(RequestRule::Serve {
support_ranges: true,
}),
},
RequestRule::Error {
status: 412,
retry_after: None,
},
RequestRule::WithHeaders {
etag: Some("\"v2\"".to_string()),
last_modified: None,
then: Box::new(RequestRule::Serve {
support_ranges: true,
}),
},
],
)
.with_etag(None)
.with_last_modified(None),
)
.await;
let server1 = TestServer::start(ServerConfig::new(0, vec![]).with_fallback(
RequestRule::Redirect {
status: 302,
location: server2.url("/file").to_string(),
},
))
.await;
let mut source = HttpSourceProtocol::new(HeaderMap::default()).unwrap();
let (reader, _) = source
.get_reader(server1.url("/download"), 0)
.await
.unwrap();
let _ = collect_reader(reader).await.unwrap();
let (_reader, offset) = source
.get_reader(server1.url("/download"), 500)
.await
.unwrap();
assert_eq!(
offset.offset, 0,
"should restart from 0 after 412 through redirect"
);
assert_eq!(
server2.request_count("/file"),
3,
"expected: initial + 412 resume + restart"
);
assert_eq!(
server1.request_count("/download"),
3,
"restart should traverse the redirect chain again"
);
}
#[tokio::test]
#[expect(
clippy::similar_names,
reason = "server2a/server2b are intentional CDN node names"
)]
async fn test_redirect_target_changes_between_requests() {
let content_size = 1000;
let server2a = TestServer::start(
ServerConfig::new(
content_size,
vec![RequestRule::Serve {
support_ranges: true,
}],
)
.with_etag(Some("\"node-a-etag\"".to_string())),
)
.await;
let server2b = TestServer::start(
ServerConfig::new(content_size, vec![])
.with_fallback(RequestRule::Serve {
support_ranges: true,
})
.with_etag(Some("\"node-b-etag\"".to_string())),
)
.await;
let server1 = TestServer::start(ServerConfig::new(0, vec![]).with_path_rules(
"/download",
vec![
RequestRule::Redirect {
status: 302,
location: server2a.url("/file").to_string(),
},
RequestRule::Redirect {
status: 302,
location: server2b.url("/file").to_string(),
},
RequestRule::Redirect {
status: 302,
location: server2b.url("/file").to_string(),
},
],
))
.await;
let mut source = HttpSourceProtocol::new(HeaderMap::default()).unwrap();
let (reader, _) = source
.get_reader(server1.url("/download"), 0)
.await
.unwrap();
let _ = collect_reader(reader).await.unwrap();
let (_reader, offset) = source
.get_reader(server1.url("/download"), 500)
.await
.unwrap();
assert_eq!(
offset.offset, 0,
"should restart when redirect target changes and ETags differ"
);
assert_eq!(
server2a.request_count("/file"),
1,
"node A: initial request only"
);
assert_eq!(
server2b.request_count("/file"),
2,
"node B: mismatched resume + restart"
);
}
#[tokio::test]
async fn test_stream_source_with_accept_ranges() {
let content_size = 5_000;
let server = TestServer::start(ServerConfig::new(
content_size,
vec![RequestRule::Serve {
support_ranges: true,
}],
))
.await;
let config = ripcurl::transfer::TransferConfig {
max_retries: 3,
overwrite: false,
custom_http_headers: vec![],
};
let (stream, info) = ripcurl::stream::stream_from_url(server.url("/file"), &config)
.await
.unwrap();
assert_eq!(info.total_size, Some(content_size as u64));
let mut stream = pin!(stream);
let mut bytes = Vec::new();
while let Some(result) = stream.next().await {
bytes.extend_from_slice(&result.unwrap());
}
let expected = common::test_server::generate_content(content_size);
assert_eq!(bytes, expected);
}
#[tokio::test]
async fn test_stream_source_rejects_no_accept_ranges() {
let content_size = 5_000;
let server = TestServer::start(ServerConfig::new(
content_size,
vec![RequestRule::Serve {
support_ranges: false,
}],
))
.await;
let config = ripcurl::transfer::TransferConfig {
max_retries: 3,
overwrite: false,
custom_http_headers: vec![],
};
match ripcurl::stream::stream_from_url(server.url("/file"), &config).await {
Err(TransferError::Permanent { reason }) => {
assert!(
reason.contains("random access"),
"expected 'random access' in error, got: {reason}"
);
}
Err(other) => panic!("expected Permanent error about random access, got: {other:?}"),
Ok(_) => panic!("expected error for source without random access, got Ok"),
}
}
#[tokio::test]
async fn test_decode_error_mid_stream_is_transient() {
let content_size = 10_000;
let bytes_before_error = 5_000;
let server = TestServer::start(ServerConfig::new(
content_size,
vec![RequestRule::PartialThenError { bytes_before_error }],
))
.await;
let mut source = HttpSourceProtocol::new(HeaderMap::default()).unwrap();
let (reader, _offset) = source.get_reader(server.url("/file"), 0).await.unwrap();
let mut total = 0u64;
let mut stream = pin!(reader.stream_bytes());
let mut error = None;
while let Some(result) = stream.next().await {
match result {
Ok(bytes) => total += bytes.len() as u64,
Err(e) => {
error = Some(e);
break;
}
}
}
let err = error.expect("should have received a stream error");
match err {
TransferError::Transient {
consumed_byte_count,
minimum_retry_delay,
reason,
} => {
assert_eq!(
consumed_byte_count, total,
"consumed_byte_count should match bytes received before error"
);
assert_eq!(minimum_retry_delay, Duration::from_secs(1));
assert!(
reason.contains("decode"),
"error message should mention decode: {reason}"
);
}
TransferError::Permanent { reason } => {
panic!("BUG: decode error was classified as Permanent (should be Transient): {reason}");
}
}
}