Skip to main content

scouter_dataframe/parquet/bifrost/
buffer.rs

1use crate::parquet::bifrost::engine::TableCommand;
2use arrow_array::RecordBatch;
3use tokio::sync::{mpsc, oneshot};
4use tokio::time::{interval, Duration};
5use tracing::{error, info};
6
7/// Per-table buffer actor that accumulates `RecordBatch` objects and flushes
8/// them to the engine actor on capacity or timer triggers.
9///
10/// Sends `Vec<RecordBatch>` directly to the engine — Delta Lake's `write()`
11/// accepts multiple batches natively, avoiding `concat_batches` copies.
12pub fn start_buffer(
13    engine_tx: mpsc::Sender<TableCommand>,
14    mut batch_rx: mpsc::Receiver<RecordBatch>,
15    mut shutdown_rx: mpsc::Receiver<()>,
16    flush_interval_secs: u64,
17    max_buffer_rows: usize,
18    table_fqn: String,
19) -> tokio::task::JoinHandle<()> {
20    tokio::spawn(async move {
21        let mut buffer: Vec<RecordBatch> = Vec::new();
22        let mut row_count: usize = 0;
23        let mut flush_ticker = interval(Duration::from_secs(flush_interval_secs));
24        flush_ticker.tick().await; // skip immediate
25
26        loop {
27            tokio::select! {
28                batch_opt = batch_rx.recv() => {
29                    match batch_opt {
30                        Some(batch) => {
31                            row_count += batch.num_rows();
32                            buffer.push(batch);
33                            if row_count >= max_buffer_rows {
34                                flush(&engine_tx, &mut buffer, &mut row_count, &table_fqn).await;
35                            }
36                        }
37                        None => {
38                            // Channel closed — flush and exit
39                            if !buffer.is_empty() {
40                                flush(&engine_tx, &mut buffer, &mut row_count, &table_fqn).await;
41                            }
42                            break;
43                        }
44                    }
45                }
46                _ = flush_ticker.tick() => {
47                    if !buffer.is_empty() {
48                        flush(&engine_tx, &mut buffer, &mut row_count, &table_fqn).await;
49                    }
50                }
51                _ = shutdown_rx.recv() => {
52                    if !buffer.is_empty() {
53                        flush(&engine_tx, &mut buffer, &mut row_count, &table_fqn).await;
54                    }
55                    break;
56                }
57            }
58        }
59
60        info!("Buffer actor shut down for [{}]", table_fqn);
61    })
62}
63
64async fn flush(
65    engine_tx: &mpsc::Sender<TableCommand>,
66    buffer: &mut Vec<RecordBatch>,
67    row_count: &mut usize,
68    table_fqn: &str,
69) {
70    let batches = std::mem::take(buffer);
71    let flushed_rows = *row_count;
72    *row_count = 0;
73
74    // Clone is O(n_columns) not O(n_rows) — columns are Arc<dyn Array>
75    let batches_backup = batches.clone();
76
77    let (tx, rx) = oneshot::channel();
78    if engine_tx
79        .send(TableCommand::Write {
80            batches,
81            respond_to: tx,
82        })
83        .await
84        .is_err()
85    {
86        error!(
87            "Engine channel closed for [{}] — restoring {} rows to buffer",
88            table_fqn, flushed_rows
89        );
90        *buffer = batches_backup;
91        *row_count = flushed_rows;
92        return;
93    }
94
95    match rx.await {
96        Ok(Ok(())) => {
97            info!("Flushed {} rows to engine [{}]", flushed_rows, table_fqn);
98        }
99        Ok(Err(e)) => {
100            error!(
101                "Write failed for [{}]: {} — restoring {} rows to buffer",
102                table_fqn, e, flushed_rows
103            );
104            *buffer = batches_backup;
105            *row_count = flushed_rows;
106        }
107        Err(_) => {
108            error!(
109                "Engine dropped response channel for [{}] — restoring {} rows to buffer",
110                table_fqn, flushed_rows
111            );
112            *buffer = batches_backup;
113            *row_count = flushed_rows;
114        }
115    }
116}