use faucet_core::Source;
use faucet_source_csv::{CsvSource, CsvSourceConfig};
use futures::StreamExt;
use std::collections::HashMap;
use std::io::Write;
use std::time::Instant;
use tempfile::NamedTempFile;
fn seed_csv(n: usize) -> NamedTempFile {
let mut tmp = NamedTempFile::new().expect("tempfile");
writeln!(tmp, "id").expect("write header");
for i in 1..=n {
writeln!(tmp, "{i}").expect("write row");
}
tmp.flush().expect("flush");
tmp
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_chunks_rows_into_batch_sized_pages() {
let tmp = seed_csv(10_000);
let config = CsvSourceConfig::new(tmp.path().to_str().unwrap()).with_batch_size(1000);
let source = CsvSource::new(config);
let ctx: HashMap<String, serde_json::Value> = HashMap::new();
let mut pages = source.stream_pages(&ctx, 1000);
let mut page_count = 0;
let mut total_rows = 0;
while let Some(page) = pages.next().await {
let page = page.expect("page ok");
page_count += 1;
total_rows += page.records.len();
assert_eq!(
page.records.len(),
1000,
"every page must be exactly batch_size rows when total is a multiple"
);
assert!(
page.bookmark.is_none(),
"csv source has no incremental mode; bookmark must be None"
);
}
assert_eq!(page_count, 10, "10_000 / 1000 = 10 pages");
assert_eq!(total_rows, 10_000);
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_partial_final_page() {
let tmp = seed_csv(2_500);
let config = CsvSourceConfig::new(tmp.path().to_str().unwrap()).with_batch_size(1000);
let source = CsvSource::new(config);
let ctx: HashMap<String, serde_json::Value> = HashMap::new();
let mut pages = source.stream_pages(&ctx, 1000);
let mut sizes = Vec::new();
while let Some(page) = pages.next().await {
let page = page.expect("page ok");
sizes.push(page.records.len());
}
assert_eq!(
sizes,
vec![1000, 1000, 500],
"partial trailing page must hold the remainder"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_batch_size_zero_emits_single_page() {
let tmp = seed_csv(2_500);
let config = CsvSourceConfig::new(tmp.path().to_str().unwrap()).with_batch_size(0);
let source = CsvSource::new(config);
let ctx: HashMap<String, serde_json::Value> = HashMap::new();
let mut pages = source.stream_pages(&ctx, 0);
let mut sizes = Vec::new();
while let Some(page) = pages.next().await {
let page = page.expect("page ok");
sizes.push(page.records.len());
}
assert_eq!(
sizes,
vec![2_500],
"batch_size = 0 must drain the file and emit exactly one page"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_empty_file_yields_no_pages() {
let mut tmp = NamedTempFile::new().unwrap();
writeln!(tmp, "id,name").unwrap();
tmp.flush().unwrap();
let config = CsvSourceConfig::new(tmp.path().to_str().unwrap()).with_batch_size(1000);
let source = CsvSource::new(config);
let ctx: HashMap<String, serde_json::Value> = HashMap::new();
let mut pages = source.stream_pages(&ctx, 1000);
let mut page_count = 0;
while let Some(page) = pages.next().await {
let _ = page.expect("page ok");
page_count += 1;
}
assert_eq!(page_count, 0, "empty file must yield zero pages");
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_headerless_uses_positional_keys() {
let mut tmp = NamedTempFile::new().unwrap();
writeln!(tmp, "Alice,30").unwrap();
writeln!(tmp, "Bob,25").unwrap();
tmp.flush().unwrap();
let config = CsvSourceConfig::new(tmp.path().to_str().unwrap())
.has_headers(false)
.with_batch_size(10);
let source = CsvSource::new(config);
let ctx: HashMap<String, serde_json::Value> = HashMap::new();
let mut pages = source.stream_pages(&ctx, 10);
let mut all_records = Vec::new();
while let Some(page) = pages.next().await {
let page = page.expect("page ok");
all_records.extend(page.records);
}
assert_eq!(all_records.len(), 2);
assert_eq!(all_records[0]["column_0"], "Alice");
assert_eq!(all_records[0]["column_1"], "30");
assert_eq!(all_records[1]["column_0"], "Bob");
assert_eq!(all_records[1]["column_1"], "25");
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_preserves_multi_column_and_quoted_fields() {
let mut tmp = NamedTempFile::new().unwrap();
writeln!(tmp, "id,name,note").unwrap();
writeln!(tmp, "1,Alice,\"hello, world\"").unwrap();
writeln!(tmp, "2,Bob,plain").unwrap();
writeln!(tmp, "3,Carol,\"a,b,c\"").unwrap();
tmp.flush().unwrap();
let config = CsvSourceConfig::new(tmp.path().to_str().unwrap()).with_batch_size(2);
let source = CsvSource::new(config);
let ctx: HashMap<String, serde_json::Value> = HashMap::new();
let mut pages = source.stream_pages(&ctx, 2);
let mut all_records = Vec::new();
while let Some(page) = pages.next().await {
let page = page.expect("page ok");
all_records.extend(page.records);
}
assert_eq!(all_records.len(), 3);
assert_eq!(all_records[0]["id"], "1");
assert_eq!(all_records[0]["name"], "Alice");
assert_eq!(all_records[0]["note"], "hello, world");
assert_eq!(all_records[1]["note"], "plain");
assert_eq!(all_records[2]["note"], "a,b,c");
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_first_page_completes_without_parsing_full_file() {
let tmp = seed_csv(200_000);
let path = tmp.path().to_str().unwrap();
let source = CsvSource::new(CsvSourceConfig::new(path).with_batch_size(1000));
let ctx: HashMap<String, serde_json::Value> = HashMap::new();
let start = Instant::now();
let mut full_pages = source.stream_pages(&ctx, 1000);
while let Some(page) = full_pages.next().await {
let _ = page.expect("page ok");
}
let full_elapsed = start.elapsed();
drop(full_pages);
drop(source);
let source = CsvSource::new(CsvSourceConfig::new(path).with_batch_size(1000));
let start = Instant::now();
let mut first_pages = source.stream_pages(&ctx, 1000);
let first_page = first_pages
.next()
.await
.expect("first page exists")
.expect("page ok");
let first_elapsed = start.elapsed();
drop(first_pages);
assert_eq!(first_page.records.len(), 1000);
assert!(
first_elapsed * 2 < full_elapsed,
"first page should arrive without parsing the full file; \
first page took {first_elapsed:?}, full drain took {full_elapsed:?}"
);
}