use std::time::Duration;
use tokio_util::sync::CancellationToken;
use super::{BatchEngine, EngineError};
use crate::transport::codec::{self, ParsedPayload};
use crate::transport::{Record, TransportReceiver, WorkBatch};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CommitMode {
Auto,
SinkManaged,
}
pub struct ParsedBatch<'a, T: crate::transport::CommitToken> {
pub records: Vec<Record>,
pub parsed: Vec<ParsedPayload>,
pub commit_tokens: Vec<T>,
pub dlq_entries: Vec<crate::transport::filter::FilteredDlqEntry>,
pub interner: &'a super::FieldInterner,
}
impl<T: crate::transport::CommitToken> ParsedBatch<'_, T> {
#[must_use]
pub fn len(&self) -> usize {
self.records.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.records.is_empty()
}
#[must_use]
pub fn intern(&self, name: &str) -> std::sync::Arc<str> {
self.interner.intern(name)
}
}
#[cfg(feature = "transport")]
struct LoopTicker<F> {
interval: Option<tokio::time::Interval>,
callback: Option<F>,
}
#[cfg(feature = "transport")]
impl<F, Fut> LoopTicker<F>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<(), EngineError>>,
{
fn new(ticker: Option<(Duration, F)>) -> Self {
let interval = ticker
.as_ref()
.map(|(d, _)| tokio::time::interval_at(tokio::time::Instant::now() + *d, *d));
Self {
interval,
callback: ticker.map(|(_, f)| f),
}
}
async fn wait(&mut self) {
match self.interval.as_mut() {
Some(i) => {
i.tick().await;
}
None => std::future::pending::<()>().await,
}
}
async fn fire(&mut self, label: &str) {
if let Some(f) = self.callback.as_mut()
&& let Err(e) = f().await
{
tracing::error!(error = %e, ticker = label, "Ticker failed");
}
}
}
impl BatchEngine {
#[cfg(feature = "transport")]
#[allow(clippy::too_many_arguments)]
pub async fn run_workbatch<R, P, Sink, SinkFut, Ticker, TickerFut>(
&self,
receiver: &R,
shutdown: CancellationToken,
process: P,
mut sink: Sink,
commit: CommitMode,
ticker: Option<(Duration, Ticker)>,
) -> Result<(), EngineError>
where
R: TransportReceiver,
P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
SinkFut: std::future::Future<Output = Result<(), EngineError>>,
Ticker: FnMut() -> TickerFut,
TickerFut: std::future::Future<Output = Result<(), EngineError>>,
{
tracing::info!(
chunk_size = self.config.max_chunk_size,
commit = ?commit,
ticker = ticker.is_some(),
"BatchEngine (workbatch) starting"
);
let mut ticker = LoopTicker::new(ticker);
loop {
tokio::select! {
biased;
() = shutdown.cancelled() => {
tracing::info!("BatchEngine (workbatch) shutting down");
return Ok(());
}
() = ticker.wait() => ticker.fire("workbatch").await,
recv_result = receiver.recv(self.config.max_chunk_size) => {
let work_batch = recv_result.map_err(EngineError::Transport)?;
let Some(batch) = self.ingest_workbatch(work_batch)? else {
continue;
};
self.drive_block(receiver, batch, &process, &mut sink, commit).await?;
}
}
}
}
#[cfg(feature = "transport")]
#[allow(clippy::too_many_arguments)]
pub async fn run_workbatch_streaming<R, P, Sink, SinkFut, Ticker, TickerFut>(
&self,
receiver: &R,
shutdown: CancellationToken,
process: P,
mut sink: Sink,
commit: CommitMode,
sub_block_bytes: u64,
ticker: Option<(Duration, Ticker)>,
) -> Result<(), EngineError>
where
R: TransportReceiver,
P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
SinkFut: std::future::Future<Output = Result<(), EngineError>>,
Ticker: FnMut() -> TickerFut,
TickerFut: std::future::Future<Output = Result<(), EngineError>>,
{
if matches!(commit, CommitMode::SinkManaged) {
return Err(EngineError::SinkManagedUnsupported);
}
tracing::info!(
chunk_size = self.config.max_chunk_size,
commit = ?commit,
sub_block_bytes,
ticker = ticker.is_some(),
"BatchEngine (workbatch streaming) starting"
);
let mut ticker = LoopTicker::new(ticker);
loop {
tokio::select! {
biased;
() = shutdown.cancelled() => {
tracing::info!("BatchEngine (workbatch streaming) shutting down");
return Ok(());
}
() = ticker.wait() => ticker.fire("workbatch streaming").await,
recv_result = receiver.recv(self.config.max_chunk_size) => {
let work_batch = recv_result.map_err(EngineError::Transport)?;
let Some(batch) = self.ingest_workbatch(work_batch)? else {
continue;
};
self.drive_block_streaming(
receiver, batch, &process, &mut sink, commit, sub_block_bytes,
)
.await?;
}
}
}
}
#[cfg(all(feature = "transport", feature = "governor"))]
#[allow(clippy::too_many_arguments)]
pub async fn run_governed<R, P, Sink, SinkFut, Ticker, TickerFut>(
&self,
receiver: &R,
shutdown: CancellationToken,
process: P,
mut sink: Sink,
commit: CommitMode,
ticker: Option<(Duration, Ticker)>,
) -> Result<(), EngineError>
where
R: TransportReceiver,
P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
SinkFut: std::future::Future<Output = Result<(), EngineError>>,
Ticker: FnMut() -> TickerFut,
TickerFut: std::future::Future<Output = Result<(), EngineError>>,
{
let Some(budget) = self.byte_budget.clone() else {
return self
.run_workbatch(receiver, shutdown, process, sink, commit, ticker)
.await;
};
if matches!(commit, CommitMode::SinkManaged) {
return Err(EngineError::SinkManagedUnsupported);
}
tracing::info!(
chunk_size = self.config.max_chunk_size,
commit = ?commit,
ticker = ticker.is_some(),
start_byte_budget = budget.byte_budget(),
"BatchEngine (governed) starting -- self-regulation ON"
);
let mut ticker = LoopTicker::new(ticker);
let mut last_recv: Option<std::time::Instant> = None;
loop {
let recv_limits = crate::transport::RecvLimits {
max_records: self.config.max_chunk_size.min(budget.record_cap()),
max_bytes: budget.byte_budget(),
};
tokio::select! {
biased;
() = shutdown.cancelled() => {
tracing::info!("BatchEngine (governed) shutting down");
return Ok(());
}
() = ticker.wait() => ticker.fire("governed").await,
recv_result = receiver.recv_limited(recv_limits) => {
let now = std::time::Instant::now();
let ingest_interval = last_recv
.map(|prev| now.saturating_duration_since(prev))
.unwrap_or_default();
last_recv = Some(now);
let work_batch = recv_result.map_err(EngineError::Transport)?;
let block_bytes = work_batch.total_payload_bytes() as u64;
let Some(batch) = self.ingest_workbatch(work_batch)? else {
budget.observe(0, Duration::ZERO, ingest_interval);
continue;
};
let sub_block_bytes = budget.byte_budget();
let process_start = std::time::Instant::now();
self.drive_block_streaming(
receiver, batch, &process, &mut sink, commit, sub_block_bytes,
)
.await?;
let process_time = process_start.elapsed();
budget.observe(block_bytes, process_time, ingest_interval);
#[cfg(feature = "metrics")]
{
metrics::gauge!("self_regulation_byte_budget")
.set(budget.byte_budget() as f64);
metrics::gauge!("self_regulation_byte_budget_bytes")
.set(budget.byte_budget() as f64);
metrics::gauge!("self_regulation_recv_block_bytes")
.set(block_bytes as f64);
metrics::gauge!("self_regulation_pressure_ratio")
.set(budget.pressure().level());
}
}
}
}
}
#[cfg(feature = "transport")]
#[allow(clippy::too_many_arguments)]
pub async fn run_workbatch_parsed<R, P, Sink, SinkFut, Ticker, TickerFut>(
&self,
receiver: &R,
shutdown: CancellationToken,
process_parsed: P,
mut sink: Sink,
commit: CommitMode,
ticker: Option<(Duration, Ticker)>,
) -> Result<(), EngineError>
where
R: TransportReceiver,
P: Fn(ParsedBatch<'_, R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
SinkFut: std::future::Future<Output = Result<(), EngineError>>,
Ticker: FnMut() -> TickerFut,
TickerFut: std::future::Future<Output = Result<(), EngineError>>,
{
tracing::info!(
chunk_size = self.config.max_chunk_size,
commit = ?commit,
ticker = ticker.is_some(),
"BatchEngine (workbatch parsed) starting"
);
let mut ticker = LoopTicker::new(ticker);
loop {
tokio::select! {
biased;
() = shutdown.cancelled() => {
tracing::info!("BatchEngine (workbatch parsed) shutting down");
return Ok(());
}
() = ticker.wait() => ticker.fire("workbatch parsed").await,
recv_result = receiver.recv(self.config.max_chunk_size) => {
let recv_batch = recv_result.map_err(EngineError::Transport)?;
let Some(batch) = self.ingest_workbatch(recv_batch)? else {
continue;
};
let parse = |b: WorkBatch<R::Token>| -> Result<WorkBatch<R::Token>, EngineError> {
let parsed = self.parse_block(b)?;
process_parsed(parsed)
};
self.drive_block(receiver, batch, &parse, &mut sink, commit).await?;
}
}
}
}
#[cfg(feature = "transport")]
fn ingest_workbatch<T: crate::transport::CommitToken>(
&self,
batch: WorkBatch<T>,
) -> Result<Option<WorkBatch<T>>, EngineError> {
let batch = self.apply_workbatch_dlq_policy(batch)?;
if batch.records.is_empty() && batch.commit_tokens.is_empty() {
return Ok(None);
}
Ok(Some(batch))
}
#[cfg(feature = "transport")]
async fn drive_block<R, P, Sink, SinkFut>(
&self,
receiver: &R,
batch: WorkBatch<R::Token>,
process: &P,
sink: &mut Sink,
commit: CommitMode,
) -> Result<(), EngineError>
where
R: TransportReceiver,
P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
SinkFut: std::future::Future<Output = Result<(), EngineError>>,
{
#[cfg(feature = "memory")]
let _ingress_lease = self.lease_ingress_batch(&batch);
let input_token_count = batch.commit_tokens.len();
let mut out_batch = process(batch)?;
if out_batch.commit_tokens.len() != input_token_count {
tracing::warn!(
input_tokens = input_token_count,
output_tokens = out_batch.commit_tokens.len(),
"process() changed the commit-token count -- the run contract is \
that process preserves source acks (transform records, not \
tokens). A drop toward zero will under-commit and stall the \
source offset; use map_records, not WorkBatch::from_records."
);
}
if !out_batch.dlq_entries.is_empty() {
let entries = std::mem::take(&mut out_batch.dlq_entries);
if let Err(e) = self.route_dlq_entries(entries) {
tracing::error!(error = %e, "DLQ route failed (workbatch) -- terminal, stopping the run loop (ack barrier)");
return Err(e);
}
}
if !out_batch.records.is_empty()
&& let Err(e) = sink(&out_batch).await
{
tracing::error!(error = %e, "Sink failed (workbatch) -- terminal, stopping the run loop (ack barrier)");
return Err(e);
}
match commit {
CommitMode::Auto => {
if let Err(e) = receiver.commit(&out_batch.commit_tokens).await {
tracing::error!(error = %e, "Commit failed (workbatch) -- terminal, stopping the run loop (ack barrier)");
return Err(EngineError::Transport(e));
}
}
CommitMode::SinkManaged => {
}
}
Ok(())
}
#[cfg(feature = "transport")]
async fn drive_block_streaming<R, P, Sink, SinkFut>(
&self,
receiver: &R,
batch: WorkBatch<R::Token>,
process: &P,
sink: &mut Sink,
commit: CommitMode,
sub_block_bytes: u64,
) -> Result<(), EngineError>
where
R: TransportReceiver,
P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
SinkFut: std::future::Future<Output = Result<(), EngineError>>,
{
let WorkBatch {
records,
commit_tokens,
..
} = batch;
let mut sub_blocks = SubBlockDrain::new(records, sub_block_bytes);
while let Some(sub_records) = sub_blocks.next_sub_block() {
let sub_block: WorkBatch<R::Token> = WorkBatch::from_records(sub_records);
#[cfg(feature = "memory")]
let _sub_lease = self.lease_ingress_batch(&sub_block);
let mut out_sub = process(sub_block)?;
if !out_sub.dlq_entries.is_empty() {
let entries = std::mem::take(&mut out_sub.dlq_entries);
if let Err(e) = self.route_dlq_entries(entries) {
tracing::error!(error = %e, "DLQ route failed (workbatch streaming) -- terminal, stopping the run loop (ack barrier)");
return Err(e);
}
}
if let Err(e) = sink(&out_sub).await {
tracing::error!(error = %e, "Sink failed (workbatch streaming) -- terminal, stopping the run loop (ack barrier)");
return Err(e);
}
}
match commit {
CommitMode::Auto => {
if let Err(e) = receiver.commit(&commit_tokens).await {
tracing::error!(error = %e, "Commit failed (workbatch streaming) -- terminal, stopping the run loop (ack barrier)");
return Err(EngineError::Transport(e));
}
}
CommitMode::SinkManaged => {
}
}
Ok(())
}
#[cfg(all(test, feature = "transport"))]
fn split_into_sub_blocks(records: Vec<Record>, target_bytes: u64) -> Vec<Vec<Record>> {
let mut drain = SubBlockDrain::new(records, target_bytes);
let mut out = Vec::new();
while let Some(sub) = drain.next_sub_block() {
out.push(sub);
}
out
}
#[cfg(feature = "transport")]
fn parse_block<T: crate::transport::CommitToken>(
&self,
batch: WorkBatch<T>,
) -> Result<ParsedBatch<'_, T>, EngineError> {
use super::ParseErrorAction;
use crate::transport::PayloadFormat;
let WorkBatch {
records,
commit_tokens,
mut dlq_entries,
} = batch;
let parsed_each: Vec<(Record, Result<ParsedPayload, String>)> =
self.pool.map_owned(records, |record| {
let format: PayloadFormat = record.metadata.format;
let result =
codec::parse(&record.payload, format).map_err(|e| format!("parse error: {e}"));
(record, result)
});
let action = self.config.parse_error_action;
let mut keep_records = Vec::new();
let mut keep_parsed = Vec::new();
for (record, result) in parsed_each {
match result {
Ok(payload) => {
keep_records.push(record);
keep_parsed.push(payload);
}
Err(reason) => match action {
ParseErrorAction::Dlq => {
self.stats.incr_errors();
self.stats.incr_dlq();
dlq_entries.push(crate::transport::filter::FilteredDlqEntry {
payload: record.payload.to_vec(),
key: record.key.clone(),
reason,
});
}
ParseErrorAction::Skip => {
self.stats.incr_errors();
}
ParseErrorAction::FailBatch => {
self.stats.incr_errors();
return Err(EngineError::ParseBatchFailed(reason));
}
},
}
}
Ok(ParsedBatch {
records: keep_records,
parsed: keep_parsed,
commit_tokens,
dlq_entries,
interner: &self.interner,
})
}
#[cfg(feature = "memory")]
pub(crate) fn lease_ingress_batch<T: crate::transport::CommitToken>(
&self,
batch: &WorkBatch<T>,
) -> Option<super::IngressLease<'_>> {
let guard = self.memory_guard.as_ref()?;
let bytes = batch.total_payload_bytes() as u64;
guard.add_bytes(bytes);
Some(super::IngressLease::new(guard, bytes))
}
}
#[cfg(feature = "transport")]
struct SubBlockDrain {
iter: std::vec::IntoIter<Record>,
peeked: Option<Record>,
target_bytes: u64,
}
#[cfg(feature = "transport")]
impl SubBlockDrain {
fn new(records: Vec<Record>, target_bytes: u64) -> Self {
Self {
iter: records.into_iter(),
peeked: None,
target_bytes,
}
}
fn next_sub_block(&mut self) -> Option<Vec<Record>> {
let first = self.peeked.take().or_else(|| self.iter.next())?;
let mut current_bytes = first.payload.len() as u64;
let mut current = vec![first];
for record in self.iter.by_ref() {
let record_bytes = record.payload.len() as u64;
if current_bytes.saturating_add(record_bytes) > self.target_bytes {
self.peeked = Some(record);
break;
}
current_bytes = current_bytes.saturating_add(record_bytes);
current.push(record);
}
Some(current)
}
}
#[cfg(all(test, feature = "transport-memory"))]
#[path = "driver_tests.rs"]
mod tests;