use std::time::Duration;
use exponential_backoff::Backoff;
use tokio::sync::mpsc;
use super::{event::Event, event_delivery::EventDelivery, BatchedMessage};
#[derive(Debug, PartialEq)]
pub(super) struct DeliveryStatus {
pub success: Vec<Event>,
pub failure: Vec<Event>,
pub retry: Vec<Event>,
}
impl DeliveryStatus {
pub fn new(success: Vec<Event>, failure: Vec<Event>, retry: Vec<Event>) -> Self {
DeliveryStatus {
success,
failure,
retry,
}
}
pub fn success(success: Vec<Event>) -> Self {
DeliveryStatus {
success,
failure: Vec::new(),
retry: Vec::new(),
}
}
pub fn failure(failure: Vec<Event>) -> Self {
DeliveryStatus {
success: Vec::new(),
retry: Vec::new(),
failure,
}
}
pub fn retry(retry: Vec<Event>) -> Self {
DeliveryStatus {
success: Vec::new(),
retry,
failure: Vec::new(),
}
}
}
#[derive(Debug, Clone, Copy)]
pub(super) struct DeliveryConfig {
pub max_retries: u32,
pub base_retry_delay: Duration,
pub max_retry_delay: Duration,
}
pub(super) async fn delivery(
mut uplink: mpsc::Receiver<BatchedMessage<Event>>,
delivery_status: mpsc::Sender<DeliveryStatus>,
event_delivery: EventDelivery,
config: DeliveryConfig,
) -> Option<()> {
let (retries_tx, mut retries_rx) =
mpsc::unbounded_channel::<(/* attempts: */ u32, BatchedMessage<Event>)>();
loop {
let (attempts, msg) = tokio::select! {
msg = uplink.recv() => (0, msg?),
msg = retries_rx.recv() => msg?,
};
let BatchedMessage { batch, flush } = msg;
let mut result = event_delivery.deliver(batch).await;
if attempts >= config.max_retries {
result.failure.append(&mut result.retry);
}
let retry_batch = std::mem::take(&mut result.retry);
let _ = delivery_status.send(result).await;
if retry_batch.is_empty() {
} else {
let retries_tx = retries_tx.clone();
tokio::spawn(async move {
wait_exponential_backoff(
attempts,
config.max_retries,
config.base_retry_delay,
config.max_retry_delay,
)
.await;
let _ = retries_tx.send((
attempts + 1,
BatchedMessage {
batch: retry_batch,
flush,
},
));
});
}
}
}
async fn wait_exponential_backoff(
attempts: u32,
max_retries: u32,
min_retry_delay: Duration,
max_retry_delay: Duration,
) {
let backoff = Backoff::new(max_retries + 1, min_retry_delay, max_retry_delay);
let delay = backoff
.iter()
.skip(attempts as usize)
.take(1)
.next()
.flatten()
.unwrap_or(max_retry_delay);
log::debug!(target: "eppo", "retry waiting for {:?}", delay);
tokio::time::sleep(delay).await;
}