use std::fmt::{Debug, Display, Formatter};
use std::str::FromStr;
use std::time::Duration;
use derive_builder::Builder;
use fluvio_future::retry::{ExponentialBackoff, FibonacciBackoff, FixedDelay};
use fluvio_spu_schema::Isolation;
use fluvio_spu_schema::server::smartmodule::SmartModuleInvocation;
use fluvio_compression::Compression;
use serde::{Serialize, Deserialize};
use crate::producer::partitioning::{Partitioner, SiphashRoundRobinPartitioner};
#[cfg(feature = "stats")]
use crate::stats::ClientStatsDataCollect;
const DEFAULT_LINGER_MS: u64 = 100;
const DEFAULT_TIMEOUT_MS: u64 = 1500;
const DEFAULT_BATCH_SIZE_BYTES: usize = 16_384;
const DEFAULT_BATCH_QUEUE_SIZE: usize = 100;
const DEFAULT_RETRIES_TIMEOUT: Duration = Duration::from_secs(300);
const DEFAULT_INITIAL_DELAY: Duration = Duration::from_millis(20);
const DEFAULT_MAX_DELAY: Duration = Duration::from_secs(200);
const DEFAULT_MAX_RETRIES: usize = 4;
fn default_batch_size() -> usize {
DEFAULT_BATCH_SIZE_BYTES
}
fn default_batch_queue_size() -> usize {
DEFAULT_BATCH_QUEUE_SIZE
}
fn default_linger_duration() -> Duration {
Duration::from_millis(DEFAULT_LINGER_MS)
}
fn default_partitioner() -> Box<dyn Partitioner + Send + Sync> {
Box::new(SiphashRoundRobinPartitioner::new())
}
fn default_timeout() -> Duration {
Duration::from_millis(DEFAULT_TIMEOUT_MS)
}
fn default_isolation() -> Isolation {
Isolation::default()
}
#[cfg(feature = "stats")]
fn default_stats_collect() -> ClientStatsDataCollect {
ClientStatsDataCollect::default()
}
fn default_delivery() -> DeliverySemantic {
DeliverySemantic::default()
}
#[derive(Builder)]
#[builder(pattern = "owned")]
pub struct TopicProducerConfig {
#[builder(default = "default_batch_size()")]
pub(crate) batch_size: usize,
#[builder(default = "default_batch_queue_size()")]
pub(crate) batch_queue_size: usize,
#[builder(default = "default_linger_duration()")]
pub(crate) linger: Duration,
#[builder(default = "default_partitioner()")]
pub(crate) partitioner: Box<dyn Partitioner + Send + Sync>,
#[builder(setter(into, strip_option), default)]
pub(crate) compression: Option<Compression>,
#[builder(default = "default_timeout()")]
pub(crate) timeout: Duration,
#[builder(default = "default_isolation()")]
pub(crate) isolation: Isolation,
#[cfg(feature = "stats")]
#[builder(default = "default_stats_collect()")]
pub(crate) stats_collect: ClientStatsDataCollect,
#[builder(default = "default_delivery()")]
pub(crate) delivery_semantic: DeliverySemantic,
#[builder(default)]
pub(crate) smartmodules: Vec<SmartModuleInvocation>,
}
impl Default for TopicProducerConfig {
fn default() -> Self {
Self {
linger: default_linger_duration(),
batch_size: default_batch_size(),
batch_queue_size: default_batch_queue_size(),
partitioner: default_partitioner(),
compression: None,
timeout: default_timeout(),
isolation: default_isolation(),
#[cfg(feature = "stats")]
stats_collect: default_stats_collect(),
delivery_semantic: default_delivery(),
smartmodules: vec![],
}
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub enum DeliverySemantic {
AtMostOnce,
AtLeastOnce(RetryPolicy),
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub struct RetryPolicy {
pub max_retries: usize,
pub initial_delay: Duration,
pub max_delay: Duration,
pub timeout: Duration,
pub strategy: RetryStrategy,
}
impl Default for DeliverySemantic {
fn default() -> Self {
Self::AtLeastOnce(RetryPolicy::default())
}
}
impl Display for DeliverySemantic {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{self:?}")
}
}
impl FromStr for DeliverySemantic {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"at_most_once" | "at-most-once" | "AtMostOnce" | "atMostOnce" | "atmostonce" => Ok(DeliverySemantic::AtMostOnce),
"at_least_once" | "at-least-once" | "AtLeastOnce" | "atLeastOnce" | "atleastonce" => Ok(DeliverySemantic::default()),
_ => Err(format!("unrecognized delivery semantic: {s}. Supported: at_most_once (AtMostOnce), at_least_once (AtLeastOnce)")),
}
}
}
impl Default for RetryPolicy {
fn default() -> Self {
Self {
max_retries: DEFAULT_MAX_RETRIES,
initial_delay: DEFAULT_INITIAL_DELAY,
max_delay: DEFAULT_MAX_DELAY,
timeout: DEFAULT_RETRIES_TIMEOUT,
strategy: RetryStrategy::ExponentialBackoff,
}
}
}
#[derive(Default, Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub enum RetryStrategy {
FixedDelay,
#[default]
ExponentialBackoff,
FibonacciBackoff,
}
#[derive(Debug)]
enum RetryPolicyIter {
FixedDelay(FixedDelay),
ExponentialBackoff(ExponentialBackoff),
FibonacciBackoff(FibonacciBackoff),
}
impl Iterator for RetryPolicyIter {
type Item = Duration;
fn next(&mut self) -> Option<Self::Item> {
match self {
RetryPolicyIter::FixedDelay(iter) => iter.next(),
RetryPolicyIter::ExponentialBackoff(iter) => iter.next(),
RetryPolicyIter::FibonacciBackoff(iter) => iter.next(),
}
}
}
impl RetryPolicy {
pub fn iter(&self) -> impl Iterator<Item = Duration> + Debug + Send {
match self.strategy {
RetryStrategy::FixedDelay => {
RetryPolicyIter::FixedDelay(FixedDelay::new(self.initial_delay))
}
RetryStrategy::ExponentialBackoff => RetryPolicyIter::ExponentialBackoff(
ExponentialBackoff::from_millis(self.initial_delay.as_millis() as u64)
.max_delay(self.max_delay),
),
RetryStrategy::FibonacciBackoff => RetryPolicyIter::FibonacciBackoff(
FibonacciBackoff::new(self.initial_delay).max_delay(self.max_delay),
),
}
.take(self.max_retries)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_retry_policy_fixed_iter() {
let duration = Duration::from_millis(10);
let policy = RetryPolicy {
max_retries: 3,
initial_delay: duration,
strategy: RetryStrategy::FixedDelay,
..Default::default()
};
let iter = policy.iter();
assert_eq!(iter.collect::<Vec<Duration>>(), [duration; 3])
}
#[test]
fn test_retry_policy_exponential_iter() {
let duration = Duration::from_millis(10);
let max_duration = Duration::from_millis(2000);
let policy = RetryPolicy {
max_retries: 5,
initial_delay: duration,
max_delay: max_duration,
strategy: RetryStrategy::ExponentialBackoff,
..Default::default()
};
let iter = policy.iter();
assert_eq!(
iter.collect::<Vec<Duration>>(),
[
duration,
Duration::from_millis(100),
Duration::from_millis(1000),
max_duration,
max_duration
]
)
}
#[test]
fn test_retry_policy_fibonacci_iter() {
let duration = Duration::from_millis(10);
let max_duration = Duration::from_millis(30);
let policy = RetryPolicy {
max_retries: 5,
initial_delay: duration,
max_delay: max_duration,
strategy: RetryStrategy::FibonacciBackoff,
..Default::default()
};
let iter = policy.iter();
assert_eq!(
iter.collect::<Vec<Duration>>(),
[
duration,
Duration::from_millis(10),
Duration::from_millis(20),
max_duration,
max_duration
]
)
}
#[test]
fn test_retry_policy_never_retry() {
let policy = RetryPolicy {
max_retries: 0,
..Default::default()
};
let iter = policy.iter();
assert_eq!(iter.collect::<Vec<Duration>>(), [])
}
}