pub mod builder;
use std::cmp::Reverse;
use std::ops::Range;
use std::sync::Arc;
use async_trait::async_trait;
use line_batch_processor::{LineBatchProcessor, LineBatchProcessorOutputPort};
use negative_slice_pass::MorselStreamReverser;
use polars_buffer::Buffer;
use polars_error::{PolarsResult, polars_bail, polars_err};
use polars_io::cloud::CloudOptions;
use polars_io::utils::compression::CompressedReader;
use polars_plan::dsl::ScanSource;
use polars_utils::IdxSize;
use polars_utils::priority::Priority;
use polars_utils::slice_enum::Slice;
use row_index_limit_pass::ApplyRowIndexOrLimit;
use super::multi_scan::reader_interface::output::FileReaderOutputRecv;
use super::multi_scan::reader_interface::{BeginReadArgs, FileReader, FileReaderCallbacks};
use crate::async_executor::{AbortOnDropHandle, spawn};
use crate::async_primitives::distributor_channel::distributor_channel;
use crate::async_primitives::linearizer::Linearizer;
use crate::async_primitives::oneshot_channel;
use crate::morsel::SourceToken;
use crate::nodes::compute_node_prelude::*;
use crate::nodes::io_sources::multi_scan::reader_interface::Projection;
use crate::nodes::io_sources::multi_scan::reader_interface::output::FileReaderOutputSend;
use crate::nodes::io_sources::ndjson::chunk_reader::ChunkReaderBuilder;
use crate::nodes::io_sources::ndjson::line_batch_distributor::RowSkipper;
use crate::nodes::{MorselSeq, TaskPriority};
pub(super) mod chunk_reader;
mod line_batch_distributor;
mod line_batch_processor;
mod negative_slice_pass;
mod row_index_limit_pass;
#[derive(Clone)]
pub struct NDJsonFileReader {
pub scan_source: ScanSource,
#[expect(unused)] pub cloud_options: Option<Arc<CloudOptions>>,
pub chunk_reader_builder: ChunkReaderBuilder,
pub count_rows_fn: fn(&[u8]) -> usize,
pub cached_bytes: Option<Buffer<u8>>,
pub verbose: bool,
}
#[async_trait]
impl FileReader for NDJsonFileReader {
async fn initialize(&mut self) -> PolarsResult<()> {
let memslice = self
.scan_source
.as_scan_source_ref()
.to_buffer_async_assume_latest(self.scan_source.run_async())?;
self.cached_bytes = Some(memslice);
Ok(())
}
fn begin_read(
&mut self,
args: BeginReadArgs,
) -> PolarsResult<(FileReaderOutputRecv, JoinHandle<PolarsResult<()>>)> {
let verbose = self.verbose;
let BeginReadArgs {
projection: Projection::Plain(projected_schema),
mut row_index,
pre_slice,
num_pipelines,
disable_morsel_split: _,
callbacks:
FileReaderCallbacks {
file_schema_tx,
n_rows_in_file_tx,
row_position_on_end_tx,
},
predicate: None,
cast_columns_policy: _,
} = args
else {
panic!("unsupported args: {:?}", &args)
};
let is_empty_slice = pre_slice.as_ref().is_some_and(|x| x.len() == 0);
let is_negative_slice = matches!(pre_slice, Some(Slice::Negative { .. }));
let reader = CompressedReader::try_new(self.cached_bytes.clone().unwrap())?;
let schema = projected_schema;
if let Some(tx) = file_schema_tx {
_ = tx.send(schema.clone())
}
let global_slice: Option<Range<usize>> = if let Some(slice) = pre_slice.clone() {
match slice {
Slice::Positive { offset, len } => Some(offset..offset.saturating_add(len)),
Slice::Negative {
offset_from_end,
len,
} => {
Some(offset_from_end.saturating_sub(len)..offset_from_end)
},
}
} else {
None
};
let (total_row_count_tx, total_row_count_rx) = if is_negative_slice && row_index.is_some() {
let (tx, rx) = oneshot_channel::channel();
(Some(tx), Some(rx))
} else {
(None, None)
};
let needs_total_row_count = total_row_count_tx.is_some()
|| n_rows_in_file_tx.is_some()
|| (row_position_on_end_tx.is_some()
&& matches!(pre_slice, Some(Slice::Negative { .. })));
if verbose {
eprintln!(
"[NDJsonFileReader]: \
project: {}, \
global_slice: {:?}, \
row_index: {:?}, \
is_negative_slice: {}",
schema.len(),
&global_slice,
&row_index,
is_negative_slice,
);
}
let n_rows_to_skip = global_slice.as_ref().map_or(0, |x| x.start);
let (opt_linearizer, mut linearizer_inserters) =
if global_slice.is_some() || row_index.is_some() {
let (a, b) =
Linearizer::<Priority<Reverse<MorselSeq>, DataFrame>>::new(num_pipelines, 1);
(Some(a), b)
} else {
(None, vec![])
};
let output_to_linearizer = opt_linearizer.is_some();
let mut output_port = None;
let opt_post_process_handle = if is_negative_slice {
let negative_slice = global_slice.unwrap();
if verbose {
eprintln!("[NDJsonFileReader]: Initialize morsel stream reverser");
}
let (morsel_senders, rx) = FileReaderOutputSend::new_parallel(num_pipelines);
output_port = Some(rx);
Some(AbortOnDropHandle::new(spawn(
TaskPriority::High,
MorselStreamReverser {
morsel_receiver: opt_linearizer.unwrap(),
morsel_senders,
offset_len_rtl: (
negative_slice.start,
negative_slice.end - negative_slice.start,
),
row_index: row_index.take().map(|x| (x, total_row_count_rx.unwrap())),
verbose,
}
.run(),
)))
} else if global_slice.is_some() || row_index.is_some() {
let mut row_index = row_index.take();
if verbose {
eprintln!("[NDJsonFileReader]: Initialize ApplyRowIndexOrLimit");
}
if let Some(ri) = row_index.as_mut() {
let Some(v) = ri.offset.checked_add(n_rows_to_skip as IdxSize) else {
let offset = ri.offset;
polars_bail!(
ComputeError:
"row_index with offset {} overflows at {} rows",
offset, n_rows_to_skip
)
};
ri.offset = v;
}
let (morsel_tx, rx) = FileReaderOutputSend::new_serial();
output_port = Some(rx);
let limit = global_slice.as_ref().map(|x| x.len());
let task = ApplyRowIndexOrLimit {
morsel_receiver: opt_linearizer.unwrap(),
morsel_tx,
limit,
row_index,
verbose,
};
if is_empty_slice {
None
} else {
Some(AbortOnDropHandle::new(spawn(
TaskPriority::High,
task.run(),
)))
}
} else {
None
};
let chunk_reader = self.chunk_reader_builder.build(schema);
let (line_batch_distribute_tx, line_batch_distribute_receivers) =
distributor_channel(num_pipelines, 1);
let mut morsel_senders = if !output_to_linearizer {
let (senders, outp) = FileReaderOutputSend::new_parallel(num_pipelines);
assert!(output_port.is_none());
output_port = Some(outp);
senders
} else {
vec![]
};
let line_batch_processor_handles = line_batch_distribute_receivers
.into_iter()
.enumerate()
.rev()
.map(|(worker_idx, line_batch_rx)| {
let chunk_reader = chunk_reader.clone();
let count_rows_fn = self.count_rows_fn;
let source_token = SourceToken::new();
AbortOnDropHandle::new(spawn(
TaskPriority::Low,
LineBatchProcessor {
worker_idx,
chunk_reader,
count_rows_fn,
line_batch_rx,
output_port: if is_empty_slice {
LineBatchProcessorOutputPort::Closed
} else if output_to_linearizer {
LineBatchProcessorOutputPort::Linearize {
tx: linearizer_inserters.pop().unwrap(),
}
} else {
LineBatchProcessorOutputPort::Direct {
tx: morsel_senders.pop().unwrap(),
source_token,
}
},
needs_total_row_count,
verbose: verbose && worker_idx == num_pipelines - 1,
}
.run(),
))
})
.collect::<Vec<_>>();
let row_skipper = RowSkipper {
cfg_n_rows_to_skip: n_rows_to_skip,
n_rows_skipped: 0,
is_line: self.chunk_reader_builder.is_line_fn(),
reverse: is_negative_slice,
};
let line_batch_distributor_task_handle = AbortOnDropHandle::new(spawn(
TaskPriority::Low,
line_batch_distributor::LineBatchDistributor {
reader,
reverse: is_negative_slice,
row_skipper,
line_batch_distribute_tx,
}
.run(),
));
let finishing_handle = spawn(TaskPriority::Low, async move {
let n_rows_skipped: usize = line_batch_distributor_task_handle.await?;
let mut n_rows_processed: usize = 0;
if verbose {
eprintln!("[NDJsonFileReader]: line batch distributor handle returned");
}
for handle in line_batch_processor_handles {
n_rows_processed = n_rows_processed.saturating_add(handle.await?);
}
let total_row_count =
needs_total_row_count.then_some(n_rows_skipped.saturating_add(n_rows_processed));
if verbose {
eprintln!("[NDJsonFileReader]: line batch processor handles returned");
}
if let Some(row_position_on_end_tx) = row_position_on_end_tx {
let n = match pre_slice {
None => n_rows_skipped.saturating_add(n_rows_processed),
Some(Slice::Positive { offset, len }) => n_rows_skipped
.saturating_add(n_rows_processed)
.min(offset.saturating_add(len)),
Some(Slice::Negative { .. }) => {
total_row_count.unwrap().saturating_sub(n_rows_skipped)
},
};
let n = IdxSize::try_from(n)
.map_err(|_| polars_err!(bigidx, ctx = "ndjson file", size = n))?;
_ = row_position_on_end_tx.send(n);
}
if let Some(tx) = total_row_count_tx {
let total_row_count = total_row_count.unwrap();
if verbose {
eprintln!(
"[NDJsonFileReader]: \
send total row count: {total_row_count}"
)
}
_ = tx.send(total_row_count);
}
if let Some(n_rows_in_file_tx) = n_rows_in_file_tx {
let total_row_count = total_row_count.unwrap();
if verbose {
eprintln!("[NDJsonFileReader]: send n_rows_in_file: {total_row_count}");
}
let num_rows = total_row_count;
let num_rows = IdxSize::try_from(num_rows)
.map_err(|_| polars_err!(bigidx, ctx = "ndjson file", size = num_rows))?;
_ = n_rows_in_file_tx.send(num_rows);
}
if let Some(handle) = opt_post_process_handle {
handle.await?;
}
if verbose {
eprintln!("[NDJsonFileReader]: returning");
}
Ok(())
});
Ok((output_port.unwrap(), finishing_handle))
}
}