use std::io::BufReader;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use crate::file_format::JsonDecoder;
use crate::utils::{ChannelReader, JsonArrayToNdjsonReader};
use crate::boundary_stream::AlignedBoundaryStream;
use datafusion_common::error::{DataFusionError, Result};
use datafusion_common::exec_datafusion_err;
use datafusion_common_runtime::{JoinSet, SpawnedTask};
use datafusion_datasource::decoder::{DecoderDeserializer, deserialize_stream};
use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
use datafusion_datasource::projection::{ProjectionOpener, SplitProjection};
use datafusion_datasource::{ListingTableUrl, PartitionedFile, as_file_source};
use datafusion_physical_plan::projection::ProjectionExprs;
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
use arrow::array::RecordBatch;
use arrow::json::ReaderBuilder;
use arrow::{datatypes::SchemaRef, json};
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_execution::TaskContext;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::{Stream, StreamExt, TryStreamExt};
use object_store::buffered::BufWriter;
use object_store::{GetOptions, GetResultPayload, ObjectStore};
use tokio::io::AsyncWriteExt;
use tokio_stream::wrappers::ReceiverStream;
const CHANNEL_BUFFER_SIZE: usize = 128;
const JSON_CONVERTER_BUFFER_SIZE: usize = 2 * 1024 * 1024;
struct JsonArrayStream {
inner: ReceiverStream<std::result::Result<RecordBatch, arrow::error::ArrowError>>,
_read_task: SpawnedTask<()>,
_parse_task: SpawnedTask<()>,
}
impl Stream for JsonArrayStream {
type Item = std::result::Result<RecordBatch, arrow::error::ArrowError>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.inner).poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
pub struct JsonOpener {
batch_size: usize,
projected_schema: SchemaRef,
file_compression_type: FileCompressionType,
object_store: Arc<dyn ObjectStore>,
newline_delimited: bool,
}
impl JsonOpener {
pub fn new(
batch_size: usize,
projected_schema: SchemaRef,
file_compression_type: FileCompressionType,
object_store: Arc<dyn ObjectStore>,
newline_delimited: bool,
) -> Self {
Self {
batch_size,
projected_schema,
file_compression_type,
object_store,
newline_delimited,
}
}
}
#[derive(Clone)]
pub struct JsonSource {
table_schema: datafusion_datasource::TableSchema,
batch_size: Option<usize>,
metrics: ExecutionPlanMetricsSet,
projection: SplitProjection,
newline_delimited: bool,
}
impl JsonSource {
pub fn new(table_schema: impl Into<datafusion_datasource::TableSchema>) -> Self {
let table_schema = table_schema.into();
Self {
projection: SplitProjection::unprojected(&table_schema),
table_schema,
batch_size: None,
metrics: ExecutionPlanMetricsSet::new(),
newline_delimited: true,
}
}
pub fn with_newline_delimited(mut self, newline_delimited: bool) -> Self {
self.newline_delimited = newline_delimited;
self
}
}
impl From<JsonSource> for Arc<dyn FileSource> {
fn from(source: JsonSource) -> Self {
as_file_source(source)
}
}
impl FileSource for JsonSource {
fn create_file_opener(
&self,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
_partition: usize,
) -> Result<Arc<dyn FileOpener>> {
let file_schema = self.table_schema.file_schema();
let projected_schema =
Arc::new(file_schema.project(&self.projection.file_indices)?);
let mut opener = Arc::new(JsonOpener {
batch_size: self
.batch_size
.expect("Batch size must set before creating opener"),
projected_schema,
file_compression_type: base_config.file_compression_type,
object_store,
newline_delimited: self.newline_delimited,
}) as Arc<dyn FileOpener>;
opener = ProjectionOpener::try_new(
self.projection.clone(),
Arc::clone(&opener),
self.table_schema.file_schema(),
)?;
Ok(opener)
}
fn table_schema(&self) -> &datafusion_datasource::TableSchema {
&self.table_schema
}
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
let mut conf = self.clone();
conf.batch_size = Some(batch_size);
Arc::new(conf)
}
fn try_pushdown_projection(
&self,
projection: &ProjectionExprs,
) -> Result<Option<Arc<dyn FileSource>>> {
let mut source = self.clone();
let new_projection = self.projection.source.try_merge(projection)?;
let split_projection =
SplitProjection::new(self.table_schema.file_schema(), &new_projection);
source.projection = split_projection;
Ok(Some(Arc::new(source)))
}
fn projection(&self) -> Option<&ProjectionExprs> {
Some(&self.projection.source)
}
fn metrics(&self) -> &ExecutionPlanMetricsSet {
&self.metrics
}
fn file_type(&self) -> &str {
"json"
}
}
impl FileOpener for JsonOpener {
fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
let store = Arc::clone(&self.object_store);
let schema = Arc::clone(&self.projected_schema);
let batch_size = self.batch_size;
let file_compression_type = self.file_compression_type.to_owned();
let newline_delimited = self.newline_delimited;
if !newline_delimited && partitioned_file.range.is_some() {
return Err(DataFusionError::NotImplemented(
"JSON array format does not support range-based file scanning. \
Disable repartition_file_scans or use newline-delimited JSON format."
.to_string(),
));
}
Ok(Box::pin(async move {
let file_size = partitioned_file.object_meta.size;
let location = &partitioned_file.object_meta.location;
if let Some(file_range) = partitioned_file.range.as_ref() {
let raw_start: u64 = file_range.start.try_into().map_err(|_| {
exec_datafusion_err!(
"Expected start range to fit in u64, got {}",
file_range.start
)
})?;
let raw_end: u64 = file_range.end.try_into().map_err(|_| {
exec_datafusion_err!(
"Expected end range to fit in u64, got {}",
file_range.end
)
})?;
let aligned_stream = AlignedBoundaryStream::new(
Arc::clone(&store),
location.clone(),
raw_start,
raw_end,
file_size,
b'\n',
)
.await?
.map_err(DataFusionError::from);
let decoder = ReaderBuilder::new(schema)
.with_batch_size(batch_size)
.build_decoder()?;
let input = file_compression_type
.convert_stream(aligned_stream.boxed())?
.fuse();
let stream = deserialize_stream(
input,
DecoderDeserializer::new(JsonDecoder::new(decoder)),
);
return Ok(stream.map_err(Into::into).boxed());
}
let options = GetOptions::default();
let result = store.get_opts(location, options).await?;
match result.payload {
#[cfg(not(target_arch = "wasm32"))]
GetResultPayload::File(file, _) => {
let bytes = file_compression_type.convert_read(file)?;
if newline_delimited {
let reader = BufReader::new(bytes);
let arrow_reader = ReaderBuilder::new(schema)
.with_batch_size(batch_size)
.build(reader)?;
Ok(futures::stream::iter(arrow_reader)
.map(|r| r.map_err(Into::into))
.boxed())
} else {
let ndjson_reader = JsonArrayToNdjsonReader::with_capacity(
bytes,
JSON_CONVERTER_BUFFER_SIZE,
);
let arrow_reader = ReaderBuilder::new(schema)
.with_batch_size(batch_size)
.build(ndjson_reader)?;
Ok(futures::stream::iter(arrow_reader)
.map(|r| r.map_err(Into::into))
.boxed())
}
}
GetResultPayload::Stream(s) => {
if newline_delimited {
let s = s.map_err(DataFusionError::from);
let decoder = ReaderBuilder::new(schema)
.with_batch_size(batch_size)
.build_decoder()?;
let input =
file_compression_type.convert_stream(s.boxed())?.fuse();
let stream = deserialize_stream(
input,
DecoderDeserializer::new(JsonDecoder::new(decoder)),
);
Ok(stream.map_err(Into::into).boxed())
} else {
let s = s.map_err(DataFusionError::from);
let decompressed_stream =
file_compression_type.convert_stream(s.boxed())?;
let (byte_tx, byte_rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(
CHANNEL_BUFFER_SIZE,
);
let (result_tx, result_rx) = tokio::sync::mpsc::channel(2);
let error_tx = result_tx.clone();
let read_task = SpawnedTask::spawn(async move {
tokio::pin!(decompressed_stream);
while let Some(chunk) = decompressed_stream.next().await {
match chunk {
Ok(bytes) => {
if byte_tx.send(bytes).await.is_err() {
break; }
}
Err(e) => {
let _ = error_tx
.send(Err(
arrow::error::ArrowError::ExternalError(
Box::new(e),
),
))
.await;
break;
}
}
}
});
let parse_task = SpawnedTask::spawn_blocking(move || {
let channel_reader = ChannelReader::new(byte_rx);
let mut ndjson_reader =
JsonArrayToNdjsonReader::with_capacity(
channel_reader,
JSON_CONVERTER_BUFFER_SIZE,
);
match ReaderBuilder::new(schema)
.with_batch_size(batch_size)
.build(&mut ndjson_reader)
{
Ok(arrow_reader) => {
for batch_result in arrow_reader {
if result_tx.blocking_send(batch_result).is_err()
{
break; }
}
}
Err(e) => {
let _ = result_tx.blocking_send(Err(e));
}
}
if let Err(e) = ndjson_reader.validate_complete() {
let _ = result_tx.blocking_send(Err(
arrow::error::ArrowError::JsonError(e.to_string()),
));
}
});
let stream = JsonArrayStream {
inner: ReceiverStream::new(result_rx),
_read_task: read_task,
_parse_task: parse_task,
};
Ok(stream.map(|r| r.map_err(Into::into)).boxed())
}
}
}
}))
}
}
pub async fn plan_to_json(
task_ctx: Arc<TaskContext>,
plan: Arc<dyn ExecutionPlan>,
path: impl AsRef<str>,
) -> Result<()> {
let path = path.as_ref();
let parsed = ListingTableUrl::parse(path)?;
let object_store_url = parsed.object_store();
let store = task_ctx.runtime_env().object_store(&object_store_url)?;
let writer_buffer_size = task_ctx
.session_config()
.options()
.execution
.objectstore_writer_buffer_size;
let mut join_set = JoinSet::new();
for i in 0..plan.output_partitioning().partition_count() {
let storeref = Arc::clone(&store);
let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
let filename = format!("{}/part-{i}.json", parsed.prefix());
let file = object_store::path::Path::parse(filename)?;
let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
join_set.spawn(async move {
let mut buf_writer =
BufWriter::with_capacity(storeref, file.clone(), writer_buffer_size);
let mut buffer = Vec::with_capacity(1024);
while let Some(batch) = stream.next().await.transpose()? {
let mut writer = json::LineDelimitedWriter::new(buffer);
writer.write(&batch)?;
buffer = writer.into_inner();
buf_writer.write_all(&buffer).await?;
buffer.clear();
}
buf_writer.shutdown().await.map_err(DataFusionError::from)
});
}
while let Some(result) = join_set.join_next().await {
match result {
Ok(res) => res?, Err(e) => {
if e.is_panic() {
std::panic::resume_unwind(e.into_panic());
} else {
unreachable!();
}
}
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::{CHUNK_SIZES, make_chunked_store};
use arrow::array::{Int64Array, StringArray};
use arrow::compute;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use bytes::Bytes;
use datafusion_datasource::FileRange;
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::{ObjectStoreExt, PutPayload};
fn test_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, true),
Field::new("name", DataType::Utf8, true),
]))
}
#[tokio::test]
async fn test_json_array_from_file() -> Result<()> {
let json_data = r#"[{"id": 1, "name": "alice"}, {"id": 2, "name": "bob"}]"#;
let store = Arc::new(InMemory::new());
let path = Path::from("test.json");
store
.put(&path, PutPayload::from_static(json_data.as_bytes()))
.await?;
let opener = JsonOpener::new(
1024,
test_schema(),
FileCompressionType::UNCOMPRESSED,
store.clone(),
false, );
let meta = store.head(&path).await?;
let file = PartitionedFile::new(path.to_string(), meta.size);
let stream = opener.open(file)?.await?;
let batches: Vec<_> = stream.try_collect().await?;
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 2);
Ok(())
}
#[tokio::test]
async fn test_json_array_from_stream() -> Result<()> {
let json_data = r#"[{"id": 1, "name": "alice"}, {"id": 2, "name": "bob"}, {"id": 3, "name": "charlie"}]"#;
let store = Arc::new(InMemory::new());
let path = Path::from("test_stream.json");
store
.put(&path, PutPayload::from_static(json_data.as_bytes()))
.await?;
let opener = JsonOpener::new(
2, test_schema(),
FileCompressionType::UNCOMPRESSED,
store.clone(),
false, );
let meta = store.head(&path).await?;
let file = PartitionedFile::new(path.to_string(), meta.size);
let stream = opener.open(file)?.await?;
let batches: Vec<_> = stream.try_collect().await?;
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 3);
Ok(())
}
#[tokio::test]
async fn test_json_array_nested_objects() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, true),
Field::new("data", DataType::Utf8, true),
]));
let json_data = r#"[
{"id": 1, "data": "{\"nested\": true}"},
{"id": 2, "data": "[1, 2, 3]"}
]"#;
let store = Arc::new(InMemory::new());
let path = Path::from("nested.json");
store
.put(&path, PutPayload::from_static(json_data.as_bytes()))
.await?;
let opener = JsonOpener::new(
1024,
schema,
FileCompressionType::UNCOMPRESSED,
store.clone(),
false,
);
let meta = store.head(&path).await?;
let file = PartitionedFile::new(path.to_string(), meta.size);
let stream = opener.open(file)?.await?;
let batches: Vec<_> = stream.try_collect().await?;
assert_eq!(batches[0].num_rows(), 2);
Ok(())
}
#[tokio::test]
async fn test_json_array_empty() -> Result<()> {
let json_data = "[]";
let store = Arc::new(InMemory::new());
let path = Path::from("empty.json");
store
.put(&path, PutPayload::from_static(json_data.as_bytes()))
.await?;
let opener = JsonOpener::new(
1024,
test_schema(),
FileCompressionType::UNCOMPRESSED,
store.clone(),
false,
);
let meta = store.head(&path).await?;
let file = PartitionedFile::new(path.to_string(), meta.size);
let stream = opener.open(file)?.await?;
let batches: Vec<_> = stream.try_collect().await?;
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 0);
Ok(())
}
#[tokio::test]
async fn test_json_array_range_not_supported() {
let store = Arc::new(InMemory::new());
let path = Path::from("test.json");
store
.put(&path, PutPayload::from_static(b"[]"))
.await
.unwrap();
let opener = JsonOpener::new(
1024,
test_schema(),
FileCompressionType::UNCOMPRESSED,
store.clone(),
false, );
let meta = store.head(&path).await.unwrap();
let mut file = PartitionedFile::new(path.to_string(), meta.size);
file.range = Some(FileRange { start: 0, end: 10 });
let result = opener.open(file);
match result {
Ok(_) => panic!("Expected error for range-based JSON array scanning"),
Err(e) => {
assert!(
e.to_string().contains("does not support range-based"),
"Unexpected error message: {e}"
);
}
}
}
#[tokio::test]
async fn test_ndjson_still_works() -> Result<()> {
let json_data =
"{\"id\": 1, \"name\": \"alice\"}\n{\"id\": 2, \"name\": \"bob\"}\n";
let store = Arc::new(InMemory::new());
let path = Path::from("test.ndjson");
store
.put(&path, PutPayload::from_static(json_data.as_bytes()))
.await?;
let opener = JsonOpener::new(
1024,
test_schema(),
FileCompressionType::UNCOMPRESSED,
store.clone(),
true, );
let meta = store.head(&path).await?;
let file = PartitionedFile::new(path.to_string(), meta.size);
let stream = opener.open(file)?.await?;
let batches: Vec<_> = stream.try_collect().await?;
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 2);
Ok(())
}
#[tokio::test]
async fn test_json_array_large_file() -> Result<()> {
let mut json_data = String::from("[");
for i in 0..1000 {
if i > 0 {
json_data.push(',');
}
json_data.push_str(&format!(r#"{{"id": {i}, "name": "user{i}"}}"#));
}
json_data.push(']');
let store = Arc::new(InMemory::new());
let path = Path::from("large.json");
store
.put(&path, PutPayload::from(Bytes::from(json_data)))
.await?;
let opener = JsonOpener::new(
100, test_schema(),
FileCompressionType::UNCOMPRESSED,
store.clone(),
false,
);
let meta = store.head(&path).await?;
let file = PartitionedFile::new(path.to_string(), meta.size);
let stream = opener.open(file)?.await?;
let batches: Vec<_> = stream.try_collect().await?;
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 1000);
assert!(batches.len() >= 10);
Ok(())
}
#[tokio::test]
async fn test_json_array_stream_cancellation() -> Result<()> {
let mut json_data = String::from("[");
for i in 0..10000 {
if i > 0 {
json_data.push(',');
}
json_data.push_str(&format!(r#"{{"id": {i}, "name": "user{i}"}}"#));
}
json_data.push(']');
let store = Arc::new(InMemory::new());
let path = Path::from("cancel_test.json");
store
.put(&path, PutPayload::from(Bytes::from(json_data)))
.await?;
let opener = JsonOpener::new(
10, test_schema(),
FileCompressionType::UNCOMPRESSED,
store.clone(),
false,
);
let meta = store.head(&path).await?;
let file = PartitionedFile::new(path.to_string(), meta.size);
let mut stream = opener.open(file)?.await?;
let first_batch = stream.next().await;
assert!(first_batch.is_some());
drop(stream);
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Ok(())
}
fn get_partition_splits() -> Vec<usize> {
vec![1usize, 2, 3, 5, 7, 10]
}
async fn collect_partitioned_batches(
store: Arc<dyn ObjectStore>,
path: &Path,
file_size: u64,
num_partitions: usize,
) -> Result<Vec<RecordBatch>> {
let mut all_batches = Vec::new();
for p in 0..num_partitions {
let start = (p as u64 * file_size) / num_partitions as u64;
let end = ((p as u64 + 1) * file_size) / num_partitions as u64;
let meta = store.head(path).await?;
let mut file = PartitionedFile::new(path.to_string(), meta.size);
file.range = Some(FileRange {
start: start as i64,
end: end as i64,
});
let opener = JsonOpener::new(
1024,
test_schema(),
FileCompressionType::UNCOMPRESSED,
Arc::clone(&store),
true,
);
let stream = opener.open(file)?.await?;
let batches: Vec<_> = stream.try_collect().await?;
all_batches.extend(batches);
}
Ok(all_batches)
}
fn concat_and_sort_by_id(batches: &[RecordBatch]) -> Result<RecordBatch> {
let schema = test_schema();
let combined = compute::concat_batches(&schema, batches)?;
let indices = compute::sort_to_indices(combined.column(0), None, None)?;
let sorted_cols: Vec<_> = combined
.columns()
.iter()
.map(|col| compute::take(col.as_ref(), &indices, None))
.collect::<std::result::Result<_, _>>()?;
Ok(RecordBatch::try_new(schema, sorted_cols)?)
}
#[tokio::test]
async fn test_ndjson_partitioned() -> Result<()> {
let num_rows: usize = 20;
let mut ndjson = String::new();
for i in 0..num_rows {
ndjson.push_str(&format!("{{\"id\": {i}, \"name\": \"user{i}\"}}\n"));
}
let ndjson_bytes = Bytes::from(ndjson);
let file_size = ndjson_bytes.len() as u64;
for &cs in CHUNK_SIZES {
let (store, path) = make_chunked_store(&ndjson_bytes, cs).await;
for num_partitions in get_partition_splits() {
let batches = collect_partitioned_batches(
Arc::clone(&store),
&path,
file_size,
num_partitions,
)
.await?;
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(
total, num_rows,
"Expected {num_rows} rows with {num_partitions} partitions"
);
let result = concat_and_sort_by_id(&batches)?;
let ids = result
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
let names = result
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
for i in 0..num_rows {
assert_eq!(
ids.value(i),
i as i64,
"id mismatch at row {i} with {num_partitions} partitions"
);
assert_eq!(
names.value(i),
format!("user{i}"),
"name mismatch at row {i} with {num_partitions} partitions"
);
}
}
}
Ok(())
}
#[tokio::test]
async fn test_ndjson_partitioned_uneven_lines() -> Result<()> {
let rows: &[(&str, &str)] = &[
("1", "alice"),
("2", "bob-with-a-longer-name"),
("3", "charlie"),
("4", "x"),
("5", "diana-has-an-even-longer-name-here"),
("6", "ed"),
("7", "francesca"),
("8", "g"),
("9", "hector-the-magnificent"),
("10", "isabella"),
];
let num_rows = rows.len();
let mut ndjson = String::new();
for (id, name) in rows {
ndjson.push_str(&format!("{{\"id\": {id}, \"name\": \"{name}\"}}\n"));
}
let ndjson_bytes = Bytes::from(ndjson);
let file_size = ndjson_bytes.len() as u64;
for &cs in CHUNK_SIZES {
let (store, path) = make_chunked_store(&ndjson_bytes, cs).await;
for num_partitions in get_partition_splits() {
let batches = collect_partitioned_batches(
Arc::clone(&store),
&path,
file_size,
num_partitions,
)
.await?;
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(
total, num_rows,
"Expected {num_rows} rows with {num_partitions} partitions"
);
let result = concat_and_sort_by_id(&batches)?;
let ids = result
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
let names = result
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
for (i, (expected_id, expected_name)) in rows.iter().enumerate() {
assert_eq!(
ids.value(i),
expected_id.parse::<i64>().unwrap(),
"id mismatch at row {i} with {num_partitions} partitions"
);
assert_eq!(
names.value(i),
*expected_name,
"name mismatch at row {i} with {num_partitions} partitions"
);
}
}
}
Ok(())
}
#[tokio::test]
async fn test_ndjson_partitioned_single_entry() -> Result<()> {
let ndjson = r#"{"id": 1, "name": "alice"}"#;
let ndjson_bytes = Bytes::from(ndjson);
let file_size = ndjson_bytes.len() as u64;
for &cs in CHUNK_SIZES {
let (store, path) = make_chunked_store(&ndjson_bytes, cs).await;
for num_partitions in get_partition_splits() {
let batches = collect_partitioned_batches(
Arc::clone(&store),
&path,
file_size,
num_partitions,
)
.await?;
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(
total, 1,
"Expected exactly 1 row with {num_partitions} partitions"
);
let result = concat_and_sort_by_id(&batches)?;
let ids = result
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
let names = result
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(ids.value(0), 1);
assert_eq!(names.value(0), "alice");
}
}
Ok(())
}
}