1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
use std::fmt::{Debug, Display, Formatter};
use std::str::FromStr;
use std::time::Duration;

use derive_builder::Builder;

use fluvio_future::retry::{ExponentialBackoff, FibonacciBackoff, FixedDelay};
use fluvio_spu_schema::Isolation;
use fluvio_spu_schema::server::smartmodule::SmartModuleInvocation;

use fluvio_compression::Compression;
use serde::{Serialize, Deserialize};

use crate::producer::partitioning::{Partitioner, SiphashRoundRobinPartitioner};
#[cfg(feature = "stats")]
use crate::stats::ClientStatsDataCollect;

const DEFAULT_LINGER_MS: u64 = 100;
const DEFAULT_TIMEOUT_MS: u64 = 1500;
const DEFAULT_BATCH_SIZE_BYTES: usize = 16_384;
const DEFAULT_BATCH_QUEUE_SIZE: usize = 100;

const DEFAULT_RETRIES_TIMEOUT: Duration = Duration::from_secs(300);
const DEFAULT_INITIAL_DELAY: Duration = Duration::from_millis(20);
const DEFAULT_MAX_DELAY: Duration = Duration::from_secs(200);
const DEFAULT_MAX_RETRIES: usize = 4;

fn default_batch_size() -> usize {
    DEFAULT_BATCH_SIZE_BYTES
}

fn default_batch_queue_size() -> usize {
    DEFAULT_BATCH_QUEUE_SIZE
}

fn default_linger_duration() -> Duration {
    Duration::from_millis(DEFAULT_LINGER_MS)
}

fn default_partitioner() -> Box<dyn Partitioner + Send + Sync> {
    Box::new(SiphashRoundRobinPartitioner::new())
}

fn default_timeout() -> Duration {
    Duration::from_millis(DEFAULT_TIMEOUT_MS)
}

fn default_isolation() -> Isolation {
    Isolation::default()
}

#[cfg(feature = "stats")]
fn default_stats_collect() -> ClientStatsDataCollect {
    ClientStatsDataCollect::default()
}

fn default_delivery() -> DeliverySemantic {
    DeliverySemantic::default()
}

/// Options used to adjust the behavior of the Producer.
/// Create this struct with [`TopicProducerConfigBuilder`].
///
/// Create a producer with a custom config with [`crate::Fluvio::topic_producer_with_config()`].
#[derive(Builder)]
#[builder(pattern = "owned")]
pub struct TopicProducerConfig {
    /// Maximum amount of bytes accumulated by the records before sending the batch.
    #[builder(default = "default_batch_size()")]
    pub(crate) batch_size: usize,
    /// Maximum amount of batches waiting in the queue before sending to the SPU.
    #[builder(default = "default_batch_queue_size()")]
    pub(crate) batch_queue_size: usize,
    /// Time to wait before sending messages to the server.
    #[builder(default = "default_linger_duration()")]
    pub(crate) linger: Duration,
    /// Partitioner assigns the partition to each record that needs to be send
    #[builder(default = "default_partitioner()")]
    pub(crate) partitioner: Box<dyn Partitioner + Send + Sync>,

    /// Compression algorithm used by Fluvio producer to compress data.
    /// If there is a topic level compression and it is not compatible with this setting, the producer
    /// initialization will fail.
    #[builder(setter(into, strip_option), default)]
    pub(crate) compression: Option<Compression>,

    /// Max time duration that the server is allowed to process the batch.
    #[builder(default = "default_timeout()")]
    pub(crate) timeout: Duration,

    /// [`Isolation`] level that the producer must respect.
    /// [`Isolation::ReadCommitted`] waits for messages to be committed (replicated) before
    /// sending the response to the caller.
    /// [`Isolation::ReadUncommitted`] just waits for the leader to accept the message.
    #[builder(default = "default_isolation()")]
    pub(crate) isolation: Isolation,

    #[cfg(feature = "stats")]
    /// Collect resource and data transfer stats used by Fluvio producer
    #[builder(default = "default_stats_collect()")]
    pub(crate) stats_collect: ClientStatsDataCollect,

    /// Delivery guarantees that producer must respect.
    /// [`DeliverySemantic::AtMostOnce`] - send records without waiting from response. `Fire and forget`
    /// approach.
    /// [`DeliverySemantic::AtLeastOnce`] - send records, wait for the response and retry
    /// if error occurred. Retry parameters, such as delay, retry strategy, timeout, etc.,
    /// can be configured in [`RetryPolicy`].
    #[builder(default = "default_delivery()")]
    pub(crate) delivery_semantic: DeliverySemantic,

    #[builder(default)]
    pub(crate) smartmodules: Vec<SmartModuleInvocation>,
}

impl Default for TopicProducerConfig {
    fn default() -> Self {
        Self {
            linger: default_linger_duration(),
            batch_size: default_batch_size(),
            batch_queue_size: default_batch_queue_size(),
            partitioner: default_partitioner(),
            compression: None,
            timeout: default_timeout(),
            isolation: default_isolation(),

            #[cfg(feature = "stats")]
            stats_collect: default_stats_collect(),
            delivery_semantic: default_delivery(),
            smartmodules: vec![],
        }
    }
}

/// Defines guarantees that Producer must follow delivering records to SPU.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub enum DeliverySemantic {
    /// Send records without waiting for the response. `Fire and forget` approach.
    AtMostOnce,
    /// Send records, wait for the response and retry if an error occurs. Retry parameters,
    /// such as delay, retry strategy, timeout, etc., can be configured in [`RetryPolicy`].
    AtLeastOnce(RetryPolicy),
}

/// Defines parameters of retries in [`DeliverySemantic::AtLeastOnce`] delivery semantic.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub struct RetryPolicy {
    /// Max amount of retries. If `0`, no retries will be performed.
    pub max_retries: usize,

    /// Initial delay before the first retry.
    pub initial_delay: Duration,

    /// The upper limit for delay for [`RetryStrategy::ExponentialBackoff`] and
    /// [`RetryStrategy::FibonacciBackoff`] retry strategies.
    pub max_delay: Duration,

    /// Max duration for all retries.
    pub timeout: Duration,

    /// Defines the strategy of delays distribution.
    pub strategy: RetryStrategy,
}

impl Default for DeliverySemantic {
    fn default() -> Self {
        Self::AtLeastOnce(RetryPolicy::default())
    }
}

impl Display for DeliverySemantic {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        write!(f, "{self:?}")
    }
}

impl FromStr for DeliverySemantic {
    type Err = String;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        match s {
            "at_most_once" | "at-most-once" | "AtMostOnce" | "atMostOnce" | "atmostonce" => Ok(DeliverySemantic::AtMostOnce),
            "at_least_once" | "at-least-once" | "AtLeastOnce" | "atLeastOnce" | "atleastonce" => Ok(DeliverySemantic::default()),
            _ => Err(format!("unrecognized delivery semantic: {s}. Supported: at_most_once (AtMostOnce), at_least_once (AtLeastOnce)")),
        }
    }
}

impl Default for RetryPolicy {
    fn default() -> Self {
        Self {
            max_retries: DEFAULT_MAX_RETRIES,
            initial_delay: DEFAULT_INITIAL_DELAY,
            max_delay: DEFAULT_MAX_DELAY,
            timeout: DEFAULT_RETRIES_TIMEOUT,
            strategy: RetryStrategy::ExponentialBackoff,
        }
    }
}

/// Strategy of delays distribution.
#[derive(Default, Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub enum RetryStrategy {
    /// A retry strategy is driven by a fixed interval between retries.
    FixedDelay,
    /// A retry strategy is driven by exponential back-off.
    /// For example, if you have five retries with initial delay of 10ms, the sequence will be:
    /// `[10ms, 100ms, 1s, 200s, 200s]`
    #[default]
    ExponentialBackoff,
    /// A retry strategy is driven by the Fibonacci series of intervals between retries.
    /// For example, if you have five retries with initial delay of 10ms, the sequence will be
    /// `[10ms, 10ms, 20ms, 30ms, 50ms]`
    FibonacciBackoff,
}

#[derive(Debug)]
enum RetryPolicyIter {
    FixedDelay(FixedDelay),
    ExponentialBackoff(ExponentialBackoff),
    FibonacciBackoff(FibonacciBackoff),
}

impl Iterator for RetryPolicyIter {
    type Item = Duration;

    fn next(&mut self) -> Option<Self::Item> {
        match self {
            RetryPolicyIter::FixedDelay(iter) => iter.next(),
            RetryPolicyIter::ExponentialBackoff(iter) => iter.next(),
            RetryPolicyIter::FibonacciBackoff(iter) => iter.next(),
        }
    }
}

impl RetryPolicy {
    pub fn iter(&self) -> impl Iterator<Item = Duration> + Debug + Send {
        match self.strategy {
            RetryStrategy::FixedDelay => {
                RetryPolicyIter::FixedDelay(FixedDelay::new(self.initial_delay))
            }
            RetryStrategy::ExponentialBackoff => RetryPolicyIter::ExponentialBackoff(
                ExponentialBackoff::from_millis(self.initial_delay.as_millis() as u64)
                    .max_delay(self.max_delay),
            ),
            RetryStrategy::FibonacciBackoff => RetryPolicyIter::FibonacciBackoff(
                FibonacciBackoff::new(self.initial_delay).max_delay(self.max_delay),
            ),
        }
        .take(self.max_retries)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_retry_policy_fixed_iter() {
        //given
        let duration = Duration::from_millis(10);
        let policy = RetryPolicy {
            max_retries: 3,
            initial_delay: duration,
            strategy: RetryStrategy::FixedDelay,
            ..Default::default()
        };

        //when
        let iter = policy.iter();

        //then
        assert_eq!(iter.collect::<Vec<Duration>>(), [duration; 3])
    }

    #[test]
    fn test_retry_policy_exponential_iter() {
        //given
        let duration = Duration::from_millis(10);
        let max_duration = Duration::from_millis(2000);
        let policy = RetryPolicy {
            max_retries: 5,
            initial_delay: duration,
            max_delay: max_duration,
            strategy: RetryStrategy::ExponentialBackoff,
            ..Default::default()
        };

        //when
        let iter = policy.iter();

        //then
        assert_eq!(
            iter.collect::<Vec<Duration>>(),
            [
                duration,
                Duration::from_millis(100),
                Duration::from_millis(1000),
                max_duration,
                max_duration
            ]
        )
    }

    #[test]
    fn test_retry_policy_fibonacci_iter() {
        //given
        let duration = Duration::from_millis(10);
        let max_duration = Duration::from_millis(30);
        let policy = RetryPolicy {
            max_retries: 5,
            initial_delay: duration,
            max_delay: max_duration,
            strategy: RetryStrategy::FibonacciBackoff,
            ..Default::default()
        };

        //when
        let iter = policy.iter();

        //then
        assert_eq!(
            iter.collect::<Vec<Duration>>(),
            [
                duration,
                Duration::from_millis(10),
                Duration::from_millis(20),
                max_duration,
                max_duration
            ]
        )
    }

    #[test]
    fn test_retry_policy_never_retry() {
        //given
        let policy = RetryPolicy {
            max_retries: 0,
            ..Default::default()
        };

        //when
        let iter = policy.iter();

        //then
        assert_eq!(iter.collect::<Vec<Duration>>(), [])
    }
}