use aws_config::BehaviorVersion;
use aws_sdk_s3::config::Credentials;
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::{Client, Config as S3Config};
use faucet_core::{DEFAULT_BATCH_SIZE, Source};
use faucet_source_s3::{S3FileFormat, S3Source, S3SourceConfig};
use futures::StreamExt;
use std::collections::HashMap;
use std::time::Instant;
use testcontainers::{ContainerAsync, runners::AsyncRunner};
use testcontainers_modules::minio::MinIO;
const ACCESS_KEY: &str = "minioadmin";
const SECRET_KEY: &str = "minioadmin";
const REGION: &str = "us-east-1";
const TEST_BUCKET: &str = "faucet-stream-tests";
async fn start_minio() -> (ContainerAsync<MinIO>, String) {
let container: ContainerAsync<MinIO> = MinIO::default()
.start()
.await
.expect("minio container start");
let port = container
.get_host_port_ipv4(9000)
.await
.expect("minio port");
let endpoint = format!("http://127.0.0.1:{port}");
(container, endpoint)
}
async fn build_admin_client(endpoint: &str) -> Client {
let creds = Credentials::new(ACCESS_KEY, SECRET_KEY, None, None, "test");
let sdk_config = aws_config::defaults(BehaviorVersion::latest())
.region(aws_config::Region::new(REGION))
.endpoint_url(endpoint)
.credentials_provider(creds)
.load()
.await;
let s3_config = S3Config::from(&sdk_config)
.to_builder()
.force_path_style(true)
.build();
Client::from_conf(s3_config)
}
async fn seed_bucket(endpoint: &str, objects: &[(String, String)]) {
let client = build_admin_client(endpoint).await;
client
.create_bucket()
.bucket(TEST_BUCKET)
.send()
.await
.expect("create bucket");
for (key, body) in objects {
client
.put_object()
.bucket(TEST_BUCKET)
.key(key)
.body(ByteStream::from(body.clone().into_bytes()))
.send()
.await
.expect("put object");
}
}
async fn build_source(endpoint: &str, config: S3SourceConfig) -> S3Source {
unsafe {
std::env::set_var("AWS_ACCESS_KEY_ID", ACCESS_KEY);
std::env::set_var("AWS_SECRET_ACCESS_KEY", SECRET_KEY);
std::env::set_var("AWS_DEFAULT_REGION", REGION);
}
let config = config
.endpoint_url(endpoint.to_string())
.region(REGION.to_string());
S3Source::new(config).await.expect("S3Source::new")
}
fn jsonl_body(start: i64, end_inclusive: i64) -> String {
let mut out = String::new();
for i in start..=end_inclusive {
out.push_str(&format!("{{\"id\":{i}}}\n"));
}
out
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_chunks_jsonl_lines_into_batch_sized_pages() {
let (_container, endpoint) = start_minio().await;
seed_bucket(
&endpoint,
&[("data.jsonl".to_string(), jsonl_body(1, 10_000))],
)
.await;
let config = S3SourceConfig::new(TEST_BUCKET).with_batch_size(1000);
let source = build_source(&endpoint, config).await;
let ctx: HashMap<String, serde_json::Value> = HashMap::new();
let mut pages = source.stream_pages(&ctx, 1000);
let mut page_count = 0;
let mut total = 0;
while let Some(page) = pages.next().await {
let page = page.expect("page ok");
page_count += 1;
total += page.records.len();
assert_eq!(
page.records.len(),
1000,
"every page must be batch_size when total is a multiple"
);
assert!(
page.bookmark.is_none(),
"S3 source has no incremental mode; bookmark must be None"
);
}
assert_eq!(page_count, 10, "10_000 / 1000 = 10 pages");
assert_eq!(total, 10_000);
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_flattens_across_multiple_jsonl_objects() {
let (_container, endpoint) = start_minio().await;
seed_bucket(
&endpoint,
&[
("a.jsonl".to_string(), jsonl_body(1, 700)),
("b.jsonl".to_string(), jsonl_body(701, 1400)),
("c.jsonl".to_string(), jsonl_body(1401, 2100)),
],
)
.await;
let config = S3SourceConfig::new(TEST_BUCKET).with_batch_size(500);
let source = build_source(&endpoint, config).await;
let ctx: HashMap<String, serde_json::Value> = HashMap::new();
let mut pages = source.stream_pages(&ctx, 500);
let mut sizes = Vec::new();
let mut ids: Vec<i64> = Vec::new();
while let Some(page) = pages.next().await {
let page = page.expect("page ok");
sizes.push(page.records.len());
for r in &page.records {
ids.push(r["id"].as_i64().expect("id"));
}
}
assert_eq!(sizes, vec![500, 500, 500, 500, 100], "flattened page sizes");
assert_eq!(ids.len(), 2100);
ids.sort_unstable();
let expected: Vec<i64> = (1..=2100).collect();
assert_eq!(ids, expected, "all ids preserved across object flattening");
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_partial_final_page() {
let (_container, endpoint) = start_minio().await;
seed_bucket(
&endpoint,
&[("data.jsonl".to_string(), jsonl_body(1, 2_500))],
)
.await;
let config = S3SourceConfig::new(TEST_BUCKET).with_batch_size(1000);
let source = build_source(&endpoint, config).await;
let ctx: HashMap<String, serde_json::Value> = HashMap::new();
let mut pages = source.stream_pages(&ctx, 1000);
let mut sizes = Vec::new();
while let Some(page) = pages.next().await {
let page = page.expect("page ok");
sizes.push(page.records.len());
}
assert_eq!(sizes, vec![1000, 1000, 500], "partial trailing page");
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_batch_size_zero_emits_one_page_per_object() {
let (_container, endpoint) = start_minio().await;
seed_bucket(
&endpoint,
&[
("a.jsonl".to_string(), jsonl_body(1, 100)),
("b.jsonl".to_string(), jsonl_body(101, 350)),
("c.jsonl".to_string(), jsonl_body(351, 351)),
],
)
.await;
let config = S3SourceConfig::new(TEST_BUCKET).with_batch_size(0);
let source = build_source(&endpoint, config).await;
let ctx: HashMap<String, serde_json::Value> = HashMap::new();
let mut pages = source.stream_pages(&ctx, 0);
let mut sizes = Vec::new();
while let Some(page) = pages.next().await {
let page = page.expect("page ok");
sizes.push(page.records.len());
}
sizes.sort_unstable();
let mut expected = vec![100, 250, 1];
expected.sort_unstable();
assert_eq!(
sizes, expected,
"one page per object, no within-object chunking"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_preserves_row_contents() {
let (_container, endpoint) = start_minio().await;
let body = "{\"id\":1,\"name\":\"alpha\"}\n\
{\"id\":2,\"name\":\"beta\"}\n\
{\"id\":3,\"name\":\"gamma\"}\n";
seed_bucket(&endpoint, &[("items.jsonl".to_string(), body.to_string())]).await;
let config = S3SourceConfig::new(TEST_BUCKET).with_batch_size(2);
let source = build_source(&endpoint, config).await;
let ctx: HashMap<String, serde_json::Value> = HashMap::new();
let mut pages = source.stream_pages(&ctx, 2);
let mut all = Vec::new();
while let Some(page) = pages.next().await {
let page = page.expect("page ok");
all.extend(page.records);
}
assert_eq!(all.len(), 3);
assert_eq!(all[0]["id"], 1);
assert_eq!(all[0]["name"], "alpha");
assert_eq!(all[2]["name"], "gamma");
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_empty_result_yields_no_pages() {
let (_container, endpoint) = start_minio().await;
seed_bucket(&endpoint, &[]).await;
let config = S3SourceConfig::new(TEST_BUCKET).with_batch_size(DEFAULT_BATCH_SIZE);
let source = build_source(&endpoint, config).await;
let ctx: HashMap<String, serde_json::Value> = HashMap::new();
let mut pages = source.stream_pages(&ctx, DEFAULT_BATCH_SIZE);
let mut page_count = 0;
while let Some(page) = pages.next().await {
let _ = page.expect("page ok");
page_count += 1;
}
assert_eq!(page_count, 0, "empty bucket must yield zero pages");
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_json_array_chunks_records() {
let (_container, endpoint) = start_minio().await;
let mut elements = Vec::new();
for i in 1..=2500 {
elements.push(serde_json::json!({"id": i}));
}
let body = serde_json::to_string(&serde_json::Value::Array(elements)).unwrap();
seed_bucket(&endpoint, &[("data.json".to_string(), body)]).await;
let config = S3SourceConfig::new(TEST_BUCKET)
.file_format(S3FileFormat::JsonArray)
.with_batch_size(1000);
let source = build_source(&endpoint, config).await;
let ctx: HashMap<String, serde_json::Value> = HashMap::new();
let mut pages = source.stream_pages(&ctx, 1000);
let mut sizes = Vec::new();
while let Some(page) = pages.next().await {
let page = page.expect("page ok");
sizes.push(page.records.len());
}
assert_eq!(
sizes,
vec![1000, 1000, 500],
"JSON-array source chunked into batch_size pages with a partial tail"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_first_page_completes_without_parsing_full_object() {
let (_container, endpoint) = start_minio().await;
seed_bucket(
&endpoint,
&[("big.jsonl".to_string(), jsonl_body(1, 200_000))],
)
.await;
let config_full = S3SourceConfig::new(TEST_BUCKET).with_batch_size(1000);
let source = build_source(&endpoint, config_full).await;
let ctx: HashMap<String, serde_json::Value> = HashMap::new();
let start = Instant::now();
let mut full_pages = source.stream_pages(&ctx, 1000);
while let Some(page) = full_pages.next().await {
let _ = page.expect("page ok");
}
let full_elapsed = start.elapsed();
drop(full_pages);
drop(source);
let config_first = S3SourceConfig::new(TEST_BUCKET).with_batch_size(1000);
let source = build_source(&endpoint, config_first).await;
let start = Instant::now();
let mut first_pages = source.stream_pages(&ctx, 1000);
let first_page = first_pages
.next()
.await
.expect("first page exists")
.expect("page ok");
let first_elapsed = start.elapsed();
drop(first_pages);
assert_eq!(first_page.records.len(), 1000);
assert!(
first_elapsed * 2 < full_elapsed,
"first page should arrive without parsing the full object; \
first page took {first_elapsed:?}, full drain took {full_elapsed:?}"
);
}