use std::cmp::Reverse;
use polars_buffer::Buffer;
use polars_error::PolarsResult;
use polars_utils::priority::Priority;
use super::chunk_reader::ChunkReader;
use crate::async_primitives::distributor_channel;
use crate::async_primitives::linearizer::Inserter;
use crate::morsel::SourceToken;
use crate::nodes::MorselSeq;
use crate::nodes::compute_node_prelude::*;
use crate::nodes::io_sources::multi_scan::reader_interface::output::FileReaderOutputSend;
pub(super) struct LineBatchProcessor {
pub(super) worker_idx: usize,
pub(super) chunk_reader: ChunkReader,
pub(super) count_rows_fn: fn(&[u8]) -> usize,
pub(super) line_batch_rx: distributor_channel::Receiver<LineBatch>,
pub(super) output_port: LineBatchProcessorOutputPort,
pub(super) needs_total_row_count: bool,
pub(super) verbose: bool,
}
impl LineBatchProcessor {
pub(super) async fn run(self) -> PolarsResult<usize> {
let LineBatchProcessor {
worker_idx,
chunk_reader,
count_rows_fn,
mut line_batch_rx,
mut output_port,
needs_total_row_count,
verbose,
} = self;
if verbose {
eprintln!(
"[NDJSON LineBatchProcessor {}]: begin run(): port_type: {}",
worker_idx,
output_port.port_type()
);
}
let mut n_rows_processed: usize = 0;
if !matches!(output_port, LineBatchProcessorOutputPort::Closed) {
while let Ok(LineBatch { bytes, chunk_idx }) = line_batch_rx.recv().await {
let df = chunk_reader.read_chunk(&bytes)?;
n_rows_processed = n_rows_processed.saturating_add(df.height());
let morsel_seq = MorselSeq::new(chunk_idx as u64);
if output_port.send(morsel_seq, df).await.is_err() {
break;
}
}
}
if needs_total_row_count {
if verbose {
eprintln!("[NDJSON LineBatchProcessor {worker_idx}]: entering row count mode");
}
while let Ok(LineBatch {
bytes,
chunk_idx: _,
}) = line_batch_rx.recv().await
{
n_rows_processed = n_rows_processed.saturating_add(count_rows_fn(&bytes));
}
}
if verbose {
eprintln!("[NDJSON LineBatchProcessor {worker_idx}]: returning");
}
Ok(n_rows_processed)
}
}
pub(super) struct LineBatch {
pub(super) bytes: Buffer<u8>,
pub(super) chunk_idx: usize,
}
pub(super) enum LineBatchProcessorOutputPort {
Direct {
tx: FileReaderOutputSend,
source_token: SourceToken,
},
Linearize {
tx: Inserter<Priority<Reverse<MorselSeq>, DataFrame>>,
},
Closed,
}
impl LineBatchProcessorOutputPort {
fn port_type(&self) -> &'static str {
use LineBatchProcessorOutputPort::*;
match self {
Direct { .. } => "direct",
Linearize { .. } => "linearize",
Closed => "closed",
}
}
async fn send(&mut self, morsel_seq: MorselSeq, df: DataFrame) -> Result<(), ()> {
use LineBatchProcessorOutputPort::*;
let result = async {
match self {
Direct { tx, source_token } => {
let morsel = Morsel::new(df, morsel_seq, source_token.clone());
tx.send_morsel(morsel).await.map_err(|_| ())?;
Ok(())
},
Linearize { tx } => tx
.insert(Priority(Reverse(morsel_seq), df))
.await
.map_err(|_| ()),
Closed => unreachable!(),
}
}
.await;
if result.is_err() {
*self = Self::Closed;
}
result
}
}