1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
use std::time::Duration;

use serde::{Deserialize, Serialize};

new_type! {
    #[doc="The internal tick interval.\n\nThe applied value is always between [100..5_000] ms. \
    When set outside of its bounds it will be adjusted to fit within the bounds.\n\n"]
    #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
    pub copy struct TickIntervalMillis(u64, env="TICK_INTERVAL_MILLIS");
}
impl TickIntervalMillis {
    pub fn into_duration(self) -> Duration {
        Duration::from_millis(self.0)
    }

    /// Only 100ms up to 5_000ms allowed. We simply adjust the
    /// values because there is no reason to crash if these have been set
    /// to an out of range value.
    pub fn adjust(self) -> TickIntervalMillis {
        std::cmp::min(5_000, std::cmp::max(100, self.0)).into()
    }
}
impl Default for TickIntervalMillis {
    fn default() -> Self {
        1000.into()
    }
}
impl From<TickIntervalMillis> for Duration {
    fn from(v: TickIntervalMillis) -> Self {
        v.into_duration()
    }
}

new_type! {
    #[doc="The time after which a stream or partition is considered inactive.\n"]
    #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
    pub copy struct InactivityTimeoutSecs(u64, env="INACTIVITY_TIMEOUT_SECS");
}
impl InactivityTimeoutSecs {
    pub fn into_duration(self) -> Duration {
        Duration::from_secs(self.0)
    }
}
impl From<InactivityTimeoutSecs> for Duration {
    fn from(v: InactivityTimeoutSecs) -> Self {
        v.into_duration()
    }
}

new_type! {
    #[doc="The time after which a stream is considered stuck and has to be aborted.\n"]
    #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
    pub copy struct StreamDeadTimeoutSecs(u64, env="STREAM_DEAD_TIMEOUT_SECS");
}
impl StreamDeadTimeoutSecs {
    pub fn into_duration(self) -> Duration {
        Duration::from_secs(self.0)
    }
}
impl From<StreamDeadTimeoutSecs> for Duration {
    fn from(v: StreamDeadTimeoutSecs) -> Self {
        v.into_duration()
    }
}
new_type! {
    #[doc="Emits a warning when no lines were received from Nakadi.\n"]
    #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
    pub copy struct WarnStreamStalledSecs(u64, env="WARN_STREAM_STALLED_SECS");
}
impl WarnStreamStalledSecs {
    pub fn into_duration(self) -> Duration {
        Duration::from_secs(self.0)
    }
}
impl From<WarnStreamStalledSecs> for Duration {
    fn from(v: WarnStreamStalledSecs) -> Self {
        v.into_duration()
    }
}

new_type! {
    #[doc="If `true` abort the consumer when an auth error occurs while connecting to a stream.\n"]
    #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
    pub copy struct AbortConnectOnAuthError(bool, env="ABORT_CONNECT_ON_AUTH_ERROR");
}
impl Default for AbortConnectOnAuthError {
    fn default() -> Self {
        false.into()
    }
}
new_type! {
    #[doc="If `true` abort the consumer when a subscription does not exist when connection to a stream.\n"]
    #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
    pub copy struct AbortConnectOnSubscriptionNotFound(bool, env="ABORT_CONNECT_ON_SUBSCRIPTION_NOT_FOUND");
}
impl Default for AbortConnectOnSubscriptionNotFound {
    fn default() -> Self {
        true.into()
    }
}

new_type! {
    #[doc="The maximum retry delay between failed attempts to connect to a stream.\n"]
    #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
    pub copy struct ConnectStreamRetryMaxDelaySecs(u64, env="CONNECT_STREAM_RETRY_MAX_DELAY_SECS");
}
impl Default for ConnectStreamRetryMaxDelaySecs {
    fn default() -> Self {
        300.into()
    }
}
impl ConnectStreamRetryMaxDelaySecs {
    pub fn into_duration(self) -> Duration {
        Duration::from_secs(self.0)
    }
}
impl From<ConnectStreamRetryMaxDelaySecs> for Duration {
    fn from(v: ConnectStreamRetryMaxDelaySecs) -> Self {
        v.into_duration()
    }
}
new_type! {
    #[doc="The timeout for a request made to Nakadi to connect to a stream.\n"]
    #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
    pub copy struct ConnectStreamTimeoutSecs(u64, env="CONNECT_STREAM_TIMEOUT_SECS");
}
impl ConnectStreamTimeoutSecs {
    pub fn into_duration(self) -> Duration {
        Duration::from_secs(self.0)
    }
}
impl Default for ConnectStreamTimeoutSecs {
    fn default() -> Self {
        10.into()
    }
}
impl From<ConnectStreamTimeoutSecs> for Duration {
    fn from(v: ConnectStreamTimeoutSecs) -> Self {
        v.into_duration()
    }
}

new_type! {
    #[doc="The timeout for a request made to Nakadi to commit cursors.\n"]
    #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
    pub copy struct CommitAttemptTimeoutMillis(u64, env="COMMIT_ATTEMPT_TIMEOUT_MILLIS");
}
impl CommitAttemptTimeoutMillis {
    pub fn into_duration(self) -> Duration {
        Duration::from_millis(self.0)
    }
}
impl Default for CommitAttemptTimeoutMillis {
    fn default() -> Self {
        1000.into()
    }
}
impl From<CommitAttemptTimeoutMillis> for Duration {
    fn from(v: CommitAttemptTimeoutMillis) -> Self {
        v.into_duration()
    }
}

new_type! {
    #[doc="The delay between failed attempts to commit cursors.\n"]
    #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
    pub copy struct CommitRetryDelayMillis(u64, env="COMMIT_RETRY_DELAY_MILLIS");
}
impl CommitRetryDelayMillis {
    pub fn into_duration(self) -> Duration {
        Duration::from_millis(self.0)
    }
}
impl Default for CommitRetryDelayMillis {
    fn default() -> Self {
        500.into()
    }
}
impl From<CommitRetryDelayMillis> for Duration {
    fn from(v: CommitRetryDelayMillis) -> Self {
        v.into_duration()
    }
}