Struct nakadion::consumer::Builder [−][src]
#[non_exhaustive]pub struct Builder {
pub subscription_id: Option<SubscriptionId>,
pub instrumentation: Option<Instrumentation>,
pub tick_interval_millis: Option<TickIntervalMillis>,
pub handler_inactivity_timeout_secs: Option<HandlerInactivityTimeoutSecs>,
pub partition_inactivity_timeout_secs: Option<PartitionInactivityTimeoutSecs>,
pub stream_dead_policy: Option<StreamDeadPolicy>,
pub warn_no_frames_secs: Option<WarnNoFramesSecs>,
pub warn_no_events_secs: Option<WarnNoEventsSecs>,
pub dispatch_mode: Option<DispatchMode>,
pub log_partition_events_mode: Option<LogPartitionEventsMode>,
pub commit_config: CommitConfig,
pub connect_config: ConnectConfig,
}
Expand description
Creates a Consumer
This struct configures and creates a consumer. Before a consumer is build the given values will be validated and defaults will be applied.
The Builder
can be created and updated from the environment. When updated
from the environment only those values will be updated which were not set
before.
De-/Serialization
The Builder
supports serialization but the instrumentation will never be
part of any serialization and therefore default to None
Environment
When initialized/updated from the environment the following environment variable
are used which by are by default prefixed with “NAKADION_” or a custom prefix “
For Nakadion itself:
- “SUBSCRIPTION_ID”
- “TICK_INTERVAL_MILLIS”
- “INACTIVITY_TIMEOUT_SECS”
- “STREAM_DEAD_POLICY”
- “WARN_STREAM_STALLED_SECS”
- “dispatch_mode”
- “COMMIT_STRATEGY”
- “ABORT_CONNECT_ON_AUTH_ERROR”
- “ABORT_CONNECT_ON_SUBSCRIPTION_NOT_FOUND”
- “CONNECT_STREAM_RETRY_MAX_DELAY_SECS”
- “CONNECT_STREAM_TIMEOUT_SECS”
- “COMMIT_ATTEMPT_TIMEOUT_MILLIS”
- “COMMIT_RETRY_DELAY_MILLIS”
For stream_parameters
:
- “STREAM_MAX_UNCOMMITTED_EVENTS”
- “STREAM_BATCH_LIMIT”
- “STREAM_LIMIT”
- “STREAM_BATCH_FLUSH_TIMEOUT_SECS”
- “STREAM_BATCH_TIMESPAN_SECS”
- “STREAM_TIMEOUT_SECS”
- “STREAM_COMMIT_TIMEOUT_SECS”
Fields (Non-exhaustive)
This struct is marked as non-exhaustive
Struct { .. }
syntax; cannot be matched against without a wildcard ..
; and struct update syntax will not work.subscription_id: Option<SubscriptionId>
The SubscriptionId
of the subscription to be consumed.
This value must be set.
instrumentation: Option<Instrumentation>
The instrumentation to be used to generate metrics
tick_interval_millis: Option<TickIntervalMillis>
The internal tick interval.
This triggers internal notification used to monitor the state of the currently consumed stream.
handler_inactivity_timeout_secs: Option<HandlerInactivityTimeoutSecs>
The time after which a handler is considered inactive.
If no batches have been processed for the given amount of time, the handler
will be notified by calling its on_inactive
method.
The default is never.
partition_inactivity_timeout_secs: Option<PartitionInactivityTimeoutSecs>
The time after which a partition is considered inactive.
If no batches have been processed for the given amount of time, the partition will be considered inactive.
The default is 90 seconds.
stream_dead_policy: Option<StreamDeadPolicy>
The time after which a stream is considered stuck and has to be aborted.
warn_no_frames_secs: Option<WarnNoFramesSecs>
Emits a warning when no frames (lines) were received from Nakadi for the given time.
warn_no_events_secs: Option<WarnNoEventsSecs>
Emits a warning when no events were received from Nakadi for the given time.
dispatch_mode: Option<DispatchMode>
Defines how batches are internally dispatched.
This e.g. configures parallelism.
log_partition_events_mode: Option<LogPartitionEventsMode>
Configure how partition events are logged.
commit_config: CommitConfig
Defines when to commit cursors.
It is recommended to set this value instead of letting Nakadion determine defaults.
connect_config: ConnectConfig
Implementations
Initializes all fields from environment variables prefixed with “NAKADION_”
Initializes all fields from environment variables prefixed with “[prefix]_”
The underscore is omitted if prefix
is empty
Initializes all fields from environment variables without any prefix
Updates all not yet set fields from environment variables prefixed with “NAKADION_”
Updates all not yet set fields from environment variables prefixed with “[prefix]_”
The underscore is omitted if prefix
is empty
Updates all not yet set fields from environment variables without any prefix
The SubscriptionId
of the subscription to be consumed.
This value must be set.
Parameters that configure the stream to be consumed.
This are within the connect_config
so that these will be created
with defaults if not already there
The instrumentation to be used to generate metrics
The internal tick interval.
This triggers internal notification used to monitor the state of the currently consumed stream.
pub fn handler_inactivity_timeout_secs<T: Into<HandlerInactivityTimeoutSecs>>(
self,
handler_inactivity_timeout_secs: T
) -> Self
pub fn handler_inactivity_timeout_secs<T: Into<HandlerInactivityTimeoutSecs>>(
self,
handler_inactivity_timeout_secs: T
) -> Self
The time after which a handler is considered inactive.
If no batches have been processed for the given amount of time, the handler
will be notified by calling its on_inactive
method.
The default is never.
pub fn partition_inactivity_timeout_secs<T: Into<PartitionInactivityTimeoutSecs>>(
self,
partition_inactivity_timeout_secs: T
) -> Self
pub fn partition_inactivity_timeout_secs<T: Into<PartitionInactivityTimeoutSecs>>(
self,
partition_inactivity_timeout_secs: T
) -> Self
The time after which a partition is considered inactive.
If no batches have been processed for the given amount of time, the partition will be considered inactive.
The default is 90 seconds.
Define when a stream is considered stuck/dead and has to be aborted.
Emits a warning when no frames (lines) were received from Nakadi for the given time.
Emits a warning when no events were received from Nakadi for the given time.
Defines how batches are internally dispatched.
This e.g. configures parallelism.
pub fn log_partition_events_mode(
self,
log_partition_events_mode: LogPartitionEventsMode
) -> Self
pub fn log_partition_events_mode(
self,
log_partition_events_mode: LogPartitionEventsMode
) -> Self
Configure how partition events are logged.
Defines how connect to nakadi to consume events.
Defines how connect to Nakadi.
pub fn configure_connector<F>(self, f: F) -> Self where
F: FnMut(ConnectConfig) -> ConnectConfig,
pub fn configure_connector<F>(self, f: F) -> Self where
F: FnMut(ConnectConfig) -> ConnectConfig,
Modify the current ConnectConfig
with a closure.
If these have not been set ConnectConfig::default()
will
be passed into the closure.
pub fn try_configure_connector<F>(self, f: F) -> Result<Self, Error> where
F: FnMut(ConnectConfig) -> Result<ConnectConfig, Error>,
pub fn try_configure_connector<F>(self, f: F) -> Result<Self, Error> where
F: FnMut(ConnectConfig) -> Result<ConnectConfig, Error>,
Modify the current ConnectConfig
with a closure.
If these have not been set ConnectConfig::default()
will
be passed into the closure.
If the closure fails, the whole Builder
will fail.
pub fn configure_committer<F>(self, f: F) -> Self where
F: FnMut(CommitConfig) -> CommitConfig,
pub fn configure_committer<F>(self, f: F) -> Self where
F: FnMut(CommitConfig) -> CommitConfig,
Modify the current CommitConfig
with a closure.
If these have not been set CommitConfig::default()
will
be passed into the closure.
pub fn try_configure_committer<F>(self, f: F) -> Result<Self, Error> where
F: FnMut(CommitConfig) -> Result<CommitConfig, Error>,
pub fn try_configure_committer<F>(self, f: F) -> Result<Self, Error> where
F: FnMut(CommitConfig) -> Result<CommitConfig, Error>,
Modify the current CommitConfig
with a closure.
If these have not been set CommitConfig::default()
will
be passed into the closure.
If the closure fails, the whole Builder
will fail.
pub fn configure_stream_parameters<F>(self, f: F) -> Self where
F: FnMut(StreamParameters) -> StreamParameters,
pub fn configure_stream_parameters<F>(self, f: F) -> Self where
F: FnMut(StreamParameters) -> StreamParameters,
Modify the current StreamParameters
with a closure.
If these have not been set StreamParameters::default()
will
be passed into the closure.
pub fn try_configure_stream_parameters<F>(self, f: F) -> Result<Self, Error> where
F: FnMut(StreamParameters) -> Result<StreamParameters, Error>,
pub fn try_configure_stream_parameters<F>(self, f: F) -> Result<Self, Error> where
F: FnMut(StreamParameters) -> Result<StreamParameters, Error>,
Modify the current StreamParameters
with a closure.
If these have not been set StreamParameters::default()
will
be passed into the closure.
If the closure fails, the whole Builder
will fail.
Use the given Metrix instrumentation as the instrumentation
pub fn metrix_mounted<A: AggregatesProcessors>(
self,
config: &MetrixConfig,
detail: MetricsDetailLevel,
processor: &mut A
) -> Self
pub fn metrix_mounted<A: AggregatesProcessors>(
self,
config: &MetrixConfig,
detail: MetricsDetailLevel,
processor: &mut A
) -> Self
Create new Metrix instrumentation and put it into the given processor with the optional name.
Applies the defaults to all values that have not been set so far.
pub fn build_with<C, HF, L>(
&self,
api_client: C,
handler_factory: HF,
logs: L
) -> Result<Consumer, Error> where
C: StreamingEssentials + Send + Sync + 'static + Clone,
HF: BatchHandlerFactory,
L: LoggingAdapter,
pub fn build_with<C, HF, L>(
&self,
api_client: C,
handler_factory: HF,
logs: L
) -> Result<Consumer, Error> where
C: StreamingEssentials + Send + Sync + 'static + Clone,
HF: BatchHandlerFactory,
L: LoggingAdapter,
Create a Consumer
pub fn build_with_tracker<C, HF, L>(
&self,
api_client: C,
handler_factory: HF,
logs: L
) -> Result<Consumer, Error> where
C: StreamingEssentials + SubscriptionApi + Send + Sync + 'static + Clone,
HF: BatchHandlerFactory,
L: LoggingAdapter + Clone,
pub fn build_with_tracker<C, HF, L>(
&self,
api_client: C,
handler_factory: HF,
logs: L
) -> Result<Consumer, Error> where
C: StreamingEssentials + SubscriptionApi + Send + Sync + 'static + Clone,
HF: BatchHandlerFactory,
L: LoggingAdapter + Clone,
Create a Consumer
which tracks stats about the stream and
exposes them to the instrumentation set with the builder.
Disabled if no instrumentation is set.
Trait Implementations
fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error> where
__D: Deserializer<'de>,
fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error> where
__D: Deserializer<'de>,
Deserialize this value from the given Serde deserializer. Read more
Auto Trait Implementations
impl !RefUnwindSafe for Builder
impl !UnwindSafe for Builder
Blanket Implementations
Mutably borrows from an owned value. Read more
pub fn vzip(self) -> V
Attaches the provided Subscriber
to this type, returning a
WithDispatch
wrapper. Read more
Attaches the current default Subscriber
to this type, returning a
WithDispatch
wrapper. Read more