use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use crate::EventBusError;
use super::backend::StreamBackend;
use super::observer::{ErrorObserver, ErrorScope};
const DEFAULT_ACK_BATCH_SIZE: usize = 64;
const DEFAULT_ACK_FLUSH_INTERVAL: Duration = Duration::from_millis(2);
pub(super) struct AckRequest {
pub id: String,
pub done: oneshot::Sender<Result<(), EventBusError>>,
}
pub(super) fn spawn<B: StreamBackend>(
backend: Arc<B>,
stream: String,
group: String,
batch_size: usize,
flush_interval: Duration,
error_observer: Option<Arc<dyn ErrorObserver>>,
) -> (mpsc::Sender<AckRequest>, JoinHandle<()>) {
let batch_size = if batch_size == 0 {
DEFAULT_ACK_BATCH_SIZE
} else {
batch_size
};
let flush_interval = if flush_interval.is_zero() {
DEFAULT_ACK_FLUSH_INTERVAL
} else {
flush_interval
};
let channel_cap = batch_size.saturating_mul(4).max(64);
let (tx, mut rx) = mpsc::channel::<AckRequest>(channel_cap);
let handle = tokio::spawn(async move {
let mut buf: Vec<AckRequest> = Vec::with_capacity(batch_size);
loop {
let first = match rx.recv().await {
Some(req) => req,
None => break,
};
buf.push(first);
while buf.len() < batch_size {
match rx.try_recv() {
Ok(req) => buf.push(req),
Err(_) => break,
}
}
if buf.len() < batch_size {
let deadline = tokio::time::sleep(flush_interval);
tokio::pin!(deadline);
loop {
tokio::select! {
biased;
maybe = rx.recv() => match maybe {
Some(req) => {
buf.push(req);
if buf.len() >= batch_size {
break;
}
}
None => break,
},
_ = &mut deadline => break,
}
}
}
flush_batch(
&backend,
&stream,
&group,
&mut buf,
error_observer.as_deref(),
)
.await;
}
while let Ok(req) = rx.try_recv() {
buf.push(req);
}
if !buf.is_empty() {
flush_batch(
&backend,
&stream,
&group,
&mut buf,
error_observer.as_deref(),
)
.await;
}
});
(tx, handle)
}
async fn flush_batch<B: StreamBackend>(
backend: &Arc<B>,
stream: &str,
group: &str,
buf: &mut Vec<AckRequest>,
error_observer: Option<&dyn ErrorObserver>,
) {
if buf.is_empty() {
return;
}
let mut ids: Vec<String> = Vec::with_capacity(buf.len());
let mut dones = Vec::with_capacity(buf.len());
for req in buf.drain(..) {
ids.push(req.id);
dones.push(req.done);
}
match backend.ack_many(stream, group, &ids).await {
Ok(()) => {
for done in dones {
let _ = done.send(Ok(()));
}
}
Err(err) => {
if let Some(obs) = error_observer {
obs.on_error(ErrorScope::AckFlush, &err);
}
let msg = err.to_string();
for done in dones {
let _ = done.send(Err(EventBusError::Connection(format!(
"batched xack on {stream}: {msg}"
))));
}
}
}
}