scouter_dataframe/parquet/bifrost/
buffer.rs1use crate::parquet::bifrost::engine::TableCommand;
2use arrow_array::RecordBatch;
3use tokio::sync::{mpsc, oneshot};
4use tokio::time::{interval, Duration};
5use tracing::{error, info};
6
7pub fn start_buffer(
13 engine_tx: mpsc::Sender<TableCommand>,
14 mut batch_rx: mpsc::Receiver<RecordBatch>,
15 mut shutdown_rx: mpsc::Receiver<()>,
16 flush_interval_secs: u64,
17 max_buffer_rows: usize,
18 table_fqn: String,
19) -> tokio::task::JoinHandle<()> {
20 tokio::spawn(async move {
21 let mut buffer: Vec<RecordBatch> = Vec::new();
22 let mut row_count: usize = 0;
23 let mut flush_ticker = interval(Duration::from_secs(flush_interval_secs));
24 flush_ticker.tick().await; loop {
27 tokio::select! {
28 batch_opt = batch_rx.recv() => {
29 match batch_opt {
30 Some(batch) => {
31 row_count += batch.num_rows();
32 buffer.push(batch);
33 if row_count >= max_buffer_rows {
34 flush(&engine_tx, &mut buffer, &mut row_count, &table_fqn).await;
35 }
36 }
37 None => {
38 if !buffer.is_empty() {
40 flush(&engine_tx, &mut buffer, &mut row_count, &table_fqn).await;
41 }
42 break;
43 }
44 }
45 }
46 _ = flush_ticker.tick() => {
47 if !buffer.is_empty() {
48 flush(&engine_tx, &mut buffer, &mut row_count, &table_fqn).await;
49 }
50 }
51 _ = shutdown_rx.recv() => {
52 if !buffer.is_empty() {
53 flush(&engine_tx, &mut buffer, &mut row_count, &table_fqn).await;
54 }
55 break;
56 }
57 }
58 }
59
60 info!("Buffer actor shut down for [{}]", table_fqn);
61 })
62}
63
64async fn flush(
65 engine_tx: &mpsc::Sender<TableCommand>,
66 buffer: &mut Vec<RecordBatch>,
67 row_count: &mut usize,
68 table_fqn: &str,
69) {
70 let batches = std::mem::take(buffer);
71 let flushed_rows = *row_count;
72 *row_count = 0;
73
74 let batches_backup = batches.clone();
76
77 let (tx, rx) = oneshot::channel();
78 if engine_tx
79 .send(TableCommand::Write {
80 batches,
81 respond_to: tx,
82 })
83 .await
84 .is_err()
85 {
86 error!(
87 "Engine channel closed for [{}] — restoring {} rows to buffer",
88 table_fqn, flushed_rows
89 );
90 *buffer = batches_backup;
91 *row_count = flushed_rows;
92 return;
93 }
94
95 match rx.await {
96 Ok(Ok(())) => {
97 info!("Flushed {} rows to engine [{}]", flushed_rows, table_fqn);
98 }
99 Ok(Err(e)) => {
100 error!(
101 "Write failed for [{}]: {} — restoring {} rows to buffer",
102 table_fqn, e, flushed_rows
103 );
104 *buffer = batches_backup;
105 *row_count = flushed_rows;
106 }
107 Err(_) => {
108 error!(
109 "Engine dropped response channel for [{}] — restoring {} rows to buffer",
110 table_fqn, flushed_rows
111 );
112 *buffer = batches_backup;
113 *row_count = flushed_rows;
114 }
115 }
116}