rivet-cli 0.10.0

Rivet: PostgreSQL/MySQL/SQL Server → Parquet/CSV (local, S3, GCS, Azure). Crate name rivet-cli; binary rivet.
Documentation
//! **Layer: Execution**
//!
//! `PipelinedSink` decouples the source fetch/convert stage from the
//! encode/compress/write stage. The source thread only fetches rows, converts
//! them to Arrow, and hands each `RecordBatch` to a bounded channel; a dedicated
//! worker thread owns the real [`ExportSink`] and does the CPU-heavy work
//! (parquet encode + compression, quality hashing, file rotation).
//!
//! This overlaps the database round-trip wait with the compression CPU work,
//! which today run serially on one thread (see `on_batch` in [`super`]).
//!
//! Ordering and invariants are preserved: a single FIFO worker processes
//! batches in arrival order, so `content_fingerprint` determinism (ADR-0012)
//! and the I1–I8 ordering contracts are unaffected.
//!
//! Memory: the synchronous path holds ~1–2 batches in flight (one being
//! converted, one being encoded). Pipelining raises the ceiling to roughly
//! `(DEPTH + 2) × batch_size` — the in-channel queue plus the batch the
//! encoder is working on plus the one the fetcher is building. It stays
//! *bounded*: the `sync_channel` blocks the fetch thread once the queue is
//! full (backpressure), and the per-batch byte budget (MySQL 64 MB cap / PG
//! `work_mem × 0.7`) is unchanged. `DEPTH` is the memory↔overlap knob:
//! lower it to spend less memory, raise it to absorb burstier fetch latency.
//!
//! **On by default** (depth [`DEFAULT_CHANNEL_DEPTH`]): the overlap is free —
//! no RSS penalty measured at depth 2, FIFO order preserved, and the
//! commit-critical finalize (`writer.finish()`) still runs on the caller's
//! thread after the worker is joined, so crash/ordering semantics are
//! unchanged. Override with `RIVET_PIPELINE_WRITES`: `0` disables it (the old
//! synchronous path), a positive integer sets the channel depth (the
//! memory↔overlap knob). Wired into the single/snapshot export path
//! (`pipeline::single`); the chunked runners are not pipelined yet (each chunk
//! already runs on its own worker — intra-chunk overlap there is a follow-up).

use std::sync::mpsc::{SyncSender, sync_channel};
use std::thread::JoinHandle;

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;

use super::ExportSink;
use crate::error::Result;
use crate::plan::ResolvedRunPlan;
use crate::source::BatchSink;

/// Channel depth: how many batches may sit between fetch and encode. Small, so
/// peak extra memory stays at ~`DEPTH × batch_size` while still giving the
/// encoder a runway to stay busy across a fetch round-trip.
const DEFAULT_CHANNEL_DEPTH: usize = 3;

enum SinkMsg {
    Schema(SchemaRef),
    Batch(RecordBatch),
}

/// A `BatchSink` that forwards schema/batches to a background encode worker.
pub(crate) struct PipelinedSink {
    tx: Option<SyncSender<SinkMsg>>,
    worker: Option<JoinHandle<Result<ExportSink>>>,
}

impl PipelinedSink {
    /// Returns `true` when the pipelined write path is requested.
    pub fn enabled() -> bool {
        Self::configured_depth().is_some()
    }

    /// Resolve the encode-worker channel depth. **On by default**
    /// ([`DEFAULT_CHANNEL_DEPTH`]) — the overlap is free (no RSS penalty
    /// measured at depth 2) and FIFO-ordered, so the source thread's DB
    /// round-trip wait overlaps the writer thread's parquet/zstd CPU. Disable
    /// with `RIVET_PIPELINE_WRITES=0`; tune the depth with a positive integer.
    fn configured_depth() -> Option<usize> {
        match std::env::var("RIVET_PIPELINE_WRITES") {
            Ok(v) if v == "0" => None, // explicit opt-out
            Ok(v) if v.is_empty() => Some(DEFAULT_CHANNEL_DEPTH),
            Ok(v) => Some(
                v.parse::<usize>()
                    .ok()
                    .filter(|&n| n > 0)
                    .unwrap_or(DEFAULT_CHANNEL_DEPTH),
            ),
            Err(_) => Some(DEFAULT_CHANNEL_DEPTH), // unset → on by default
        }
    }

    /// Spawn the encode worker. The worker creates and owns the `ExportSink`
    /// (so the writer trait object never crosses a thread boundary while in
    /// use); `finish` returns the fully-populated sink back to the caller.
    pub fn spawn(plan: &ResolvedRunPlan) -> Result<Self> {
        let plan = plan.clone();
        let depth = Self::configured_depth().unwrap_or(DEFAULT_CHANNEL_DEPTH);
        let (tx, rx) = sync_channel::<SinkMsg>(depth);
        let worker = std::thread::Builder::new()
            .name("rivet-encode".to_string())
            .spawn(move || -> Result<ExportSink> {
                let mut sink = ExportSink::new(&plan)?;
                while let Ok(msg) = rx.recv() {
                    match msg {
                        SinkMsg::Schema(s) => sink.on_schema(s)?,
                        SinkMsg::Batch(b) => sink.on_batch(&b)?,
                    }
                }
                Ok(sink)
            })?;
        Ok(Self {
            tx: Some(tx),
            worker: Some(worker),
        })
    }

    /// Test-only: wrap an already-built `ExportSink` (bypasses plan
    /// construction). Same worker loop as `spawn`.
    #[cfg(test)]
    pub(crate) fn spawn_with_sink(sink: ExportSink) -> Self {
        let (tx, rx) = sync_channel::<SinkMsg>(DEFAULT_CHANNEL_DEPTH);
        let worker = std::thread::Builder::new()
            .name("rivet-encode-test".to_string())
            .spawn(move || -> Result<ExportSink> {
                let mut sink = sink;
                while let Ok(msg) = rx.recv() {
                    match msg {
                        SinkMsg::Schema(s) => sink.on_schema(s)?,
                        SinkMsg::Batch(b) => sink.on_batch(&b)?,
                    }
                }
                Ok(sink)
            })
            .expect("spawn test encode worker");
        Self {
            tx: Some(tx),
            worker: Some(worker),
        }
    }

    fn send(&mut self, msg: SinkMsg) -> Result<()> {
        match self.tx.as_ref() {
            // A send error means the worker hit an error and dropped the
            // receiver. We return a generic abort here; `finish` joins the
            // worker and surfaces the *real* error, which the caller checks
            // before the source-side error (see `run_single_export`).
            Some(tx) => tx
                .send(msg)
                .map_err(|_| anyhow::anyhow!("pipelined encode worker stopped")),
            None => Err(anyhow::anyhow!("pipelined sink already finished")),
        }
    }

    /// Close the channel, join the worker, and recover the populated
    /// `ExportSink`. Propagates a worker error or panic.
    pub fn finish(mut self) -> Result<ExportSink> {
        // Drop the sender so the worker's `recv` loop terminates.
        self.tx.take();
        match self.worker.take() {
            Some(handle) => match handle.join() {
                Ok(result) => result,
                Err(_) => Err(anyhow::anyhow!("pipelined encode worker panicked")),
            },
            None => Err(anyhow::anyhow!("pipelined sink has no worker")),
        }
    }
}

impl BatchSink for PipelinedSink {
    fn on_schema(&mut self, schema: SchemaRef) -> Result<()> {
        self.send(SinkMsg::Schema(schema))
    }

    fn on_batch(&mut self, batch: &RecordBatch) -> Result<()> {
        // RecordBatch clone is a shallow Arc bump per column — cheap. The
        // batch crosses to the worker by value so the source loop can free
        // its own reference immediately on the next fetch.
        self.send(SinkMsg::Batch(batch.clone()))
    }
}