use std::sync::mpsc::{SyncSender, sync_channel};
use std::thread::JoinHandle;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use super::ExportSink;
use crate::error::Result;
use crate::plan::ResolvedRunPlan;
use crate::source::BatchSink;
const DEFAULT_CHANNEL_DEPTH: usize = 3;
enum SinkMsg {
Schema(SchemaRef),
Batch(RecordBatch),
}
pub(crate) struct PipelinedSink {
tx: Option<SyncSender<SinkMsg>>,
worker: Option<JoinHandle<Result<ExportSink>>>,
}
impl PipelinedSink {
pub fn enabled() -> bool {
Self::configured_depth().is_some()
}
fn configured_depth() -> Option<usize> {
match std::env::var("RIVET_PIPELINE_WRITES") {
Ok(v) if v.is_empty() || v == "0" => None,
Ok(v) => Some(
v.parse::<usize>()
.ok()
.filter(|&n| n > 0)
.unwrap_or(DEFAULT_CHANNEL_DEPTH),
),
Err(_) => None,
}
}
pub fn spawn(plan: &ResolvedRunPlan) -> Result<Self> {
let plan = plan.clone();
let depth = Self::configured_depth().unwrap_or(DEFAULT_CHANNEL_DEPTH);
let (tx, rx) = sync_channel::<SinkMsg>(depth);
let worker = std::thread::Builder::new()
.name("rivet-encode".to_string())
.spawn(move || -> Result<ExportSink> {
let mut sink = ExportSink::new(&plan)?;
while let Ok(msg) = rx.recv() {
match msg {
SinkMsg::Schema(s) => sink.on_schema(s)?,
SinkMsg::Batch(b) => sink.on_batch(&b)?,
}
}
Ok(sink)
})?;
Ok(Self {
tx: Some(tx),
worker: Some(worker),
})
}
#[cfg(test)]
pub(crate) fn spawn_with_sink(sink: ExportSink) -> Self {
let (tx, rx) = sync_channel::<SinkMsg>(DEFAULT_CHANNEL_DEPTH);
let worker = std::thread::Builder::new()
.name("rivet-encode-test".to_string())
.spawn(move || -> Result<ExportSink> {
let mut sink = sink;
while let Ok(msg) = rx.recv() {
match msg {
SinkMsg::Schema(s) => sink.on_schema(s)?,
SinkMsg::Batch(b) => sink.on_batch(&b)?,
}
}
Ok(sink)
})
.expect("spawn test encode worker");
Self {
tx: Some(tx),
worker: Some(worker),
}
}
fn send(&mut self, msg: SinkMsg) -> Result<()> {
match self.tx.as_ref() {
Some(tx) => tx
.send(msg)
.map_err(|_| anyhow::anyhow!("pipelined encode worker stopped")),
None => Err(anyhow::anyhow!("pipelined sink already finished")),
}
}
pub fn finish(mut self) -> Result<ExportSink> {
self.tx.take();
match self.worker.take() {
Some(handle) => match handle.join() {
Ok(result) => result,
Err(_) => Err(anyhow::anyhow!("pipelined encode worker panicked")),
},
None => Err(anyhow::anyhow!("pipelined sink has no worker")),
}
}
}
impl BatchSink for PipelinedSink {
fn on_schema(&mut self, schema: SchemaRef) -> Result<()> {
self.send(SinkMsg::Schema(schema))
}
fn on_batch(&mut self, batch: &RecordBatch) -> Result<()> {
self.send(SinkMsg::Batch(batch.clone()))
}
}