1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
use std::time::Duration;

use anyhow::Result;
use derive_builder::Builder;

use fluvio_spu_schema::{server::smartmodule::SmartModuleInvocation, Isolation};
use fluvio_types::PartitionId;

use crate::{FluvioError, Offset};

use super::MAX_FETCH_BYTES;

const DEFAULT_OFFSET_FLUSH_PERIOD: Duration = Duration::from_secs(10);

/// Configures the behavior of consumer fetching and streaming
#[derive(Debug, Builder, Clone)]
#[builder(build_fn(private, name = "build_impl"))]
pub struct ConsumerConfig {
    #[builder(default)]
    pub disable_continuous: bool,
    #[builder(default = "*MAX_FETCH_BYTES")]
    pub max_bytes: i32,
    #[builder(default)]
    pub isolation: Isolation,
    #[builder(default)]
    pub smartmodule: Vec<SmartModuleInvocation>,
}

impl ConsumerConfig {
    pub fn builder() -> ConsumerConfigBuilder {
        ConsumerConfigBuilder::default()
    }
}

impl ConsumerConfigBuilder {
    pub fn build(&self) -> Result<ConsumerConfig> {
        let config = self.build_impl().map_err(|e| {
            FluvioError::ConsumerConfig(format!("Missing required config option: {e}"))
        })?;
        Ok(config)
    }
}

#[derive(Debug, Default, Copy, Clone)]
pub enum OffsetManagementStrategy {
    /// Offsets are not saved
    #[default]
    None,
    /// All operations must be invoked explicitly.
    Manual,
    /// Before yielding a new record to the caller, the previous record is committed and flushed if the configured interval is passed.
    /// Additionally, the commit and the flush are triggered when the stream object gets dropped.
    Auto,
}

#[derive(Debug, Builder, Clone)]
pub struct ConsumerConfigExt {
    #[builder(setter(into))]
    pub topic: String,
    #[builder(default, setter(custom))]
    pub partition: Vec<PartitionId>,
    #[builder(default, setter(strip_option, into))]
    pub mirror: Option<String>,
    #[builder(default, setter(strip_option, into))]
    pub offset_consumer: Option<String>,
    pub offset_start: Offset,
    #[builder(default)]
    pub offset_strategy: OffsetManagementStrategy,
    #[builder(default = "DEFAULT_OFFSET_FLUSH_PERIOD")]
    pub offset_flush: Duration,
    #[builder(default)]
    disable_continuous: bool,
    #[builder(default = "*MAX_FETCH_BYTES")]
    pub max_bytes: i32,
    #[builder(default)]
    pub isolation: Isolation,
    #[builder(default)]
    pub smartmodule: Vec<SmartModuleInvocation>,
}

impl ConsumerConfigExt {
    pub fn builder() -> ConsumerConfigExtBuilder {
        ConsumerConfigExtBuilder::default()
    }

    pub fn into_parts(
        self,
    ) -> (
        Offset,
        ConsumerConfig,
        Option<String>,
        OffsetManagementStrategy,
        Duration,
    ) {
        let Self {
            topic: _,
            partition: _,
            mirror: _,
            offset_consumer,
            offset_start,
            disable_continuous,
            max_bytes,
            isolation,
            smartmodule,
            offset_strategy,
            offset_flush,
        } = self;

        let config = ConsumerConfig {
            disable_continuous,
            max_bytes,
            isolation,
            smartmodule,
        };

        (
            offset_start,
            config,
            offset_consumer,
            offset_strategy,
            offset_flush,
        )
    }
}

impl ConsumerConfigExtBuilder {
    pub fn partition(&mut self, value: PartitionId) -> &mut Self {
        self.partition.get_or_insert(Vec::new()).push(value);
        self
    }
}

impl From<ConsumerConfigExt> for ConsumerConfig {
    fn from(value: ConsumerConfigExt) -> Self {
        let ConsumerConfigExt {
            topic: _,
            partition: _,
            mirror: _,
            offset_consumer: _,
            offset_start: _,
            offset_strategy: _,
            offset_flush: _,
            disable_continuous,
            max_bytes,
            isolation,
            smartmodule,
        } = value;

        Self {
            disable_continuous,
            max_bytes,
            isolation,
            smartmodule,
        }
    }
}