use std::sync::Arc;
use serde::{Deserialize, Serialize};
use crate::components::StreamingEssentials;
use crate::handler::BatchHandlerFactory;
use crate::logging::LoggingAdapter;
use crate::nakadi_types::subscription::{StreamParameters, SubscriptionId};
use crate::Error;
use super::{Config, Consumer, Inner};
use crate::helpers::mandatory;
pub use crate::instrumentation::{Instrumentation, MetricsDetailLevel};
#[cfg(feature = "metrix")]
pub use crate::instrumentation::{Metrix, MetrixConfig};
#[cfg(feature = "metrix")]
pub use metrix::{processor::ProcessorMount, AggregatesProcessors};
mod new_types;
pub use new_types::*;
pub mod complex_types;
use crate::{
api::SubscriptionApi,
components::{
committer::{CommitConfig, CommitStrategy},
connector::ConnectConfig,
},
internals::controller::types::LifecycleListeners,
};
pub use complex_types::*;
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
#[non_exhaustive]
pub struct Builder {
pub subscription_id: Option<SubscriptionId>,
#[serde(skip)]
pub instrumentation: Option<Instrumentation>,
pub tick_interval_millis: Option<TickIntervalMillis>,
pub handler_inactivity_timeout_secs: Option<HandlerInactivityTimeoutSecs>,
pub partition_inactivity_timeout_secs: Option<PartitionInactivityTimeoutSecs>,
pub stream_dead_policy: Option<StreamDeadPolicy>,
pub warn_no_frames_secs: Option<WarnNoFramesSecs>,
pub warn_no_events_secs: Option<WarnNoEventsSecs>,
pub dispatch_mode: Option<DispatchMode>,
pub log_partition_events_mode: Option<LogPartitionEventsMode>,
pub commit_config: CommitConfig,
pub connect_config: ConnectConfig,
}
impl Builder {
env_ctors!();
fn fill_from_env_prefixed_internal<T: AsRef<str>>(&mut self, prefix: T) -> Result<(), Error> {
if self.subscription_id.is_none() {
self.subscription_id = SubscriptionId::try_from_env_prefixed(prefix.as_ref())?;
}
if self.instrumentation.is_none() {
self.instrumentation = Default::default();
}
if self.tick_interval_millis.is_none() {
self.tick_interval_millis = TickIntervalMillis::try_from_env_prefixed(prefix.as_ref())?;
}
if self.handler_inactivity_timeout_secs.is_none() {
self.handler_inactivity_timeout_secs =
HandlerInactivityTimeoutSecs::try_from_env_prefixed(prefix.as_ref())?;
}
if self.partition_inactivity_timeout_secs.is_none() {
self.partition_inactivity_timeout_secs =
PartitionInactivityTimeoutSecs::try_from_env_prefixed(prefix.as_ref())?;
}
if self.stream_dead_policy.is_none() {
self.stream_dead_policy = StreamDeadPolicy::try_from_env_prefixed(prefix.as_ref())?;
}
if self.warn_no_frames_secs.is_none() {
self.warn_no_frames_secs = WarnNoFramesSecs::try_from_env_prefixed(prefix.as_ref())?;
}
if self.warn_no_events_secs.is_none() {
self.warn_no_events_secs = WarnNoEventsSecs::try_from_env_prefixed(prefix.as_ref())?;
}
if self.dispatch_mode.is_none() {
self.dispatch_mode = DispatchMode::try_from_env_prefixed(prefix.as_ref())?;
}
if self.log_partition_events_mode.is_none() {
self.log_partition_events_mode =
LogPartitionEventsMode::try_from_env_prefixed(prefix.as_ref())?;
}
self.commit_config.fill_from_env_prefixed(prefix.as_ref())?;
self.connect_config
.fill_from_env_prefixed(prefix.as_ref())?;
Ok(())
}
pub fn subscription_id(mut self, subscription_id: SubscriptionId) -> Self {
self.subscription_id = Some(subscription_id);
self
}
pub fn stream_parameters(mut self, params: StreamParameters) -> Self {
self.connect_config.stream_parameters = params;
self
}
pub fn instrumentation(mut self, instr: Instrumentation) -> Self {
self.instrumentation = Some(instr);
self
}
pub fn tick_interval_millis<T: Into<TickIntervalMillis>>(mut self, tick_interval: T) -> Self {
self.tick_interval_millis = Some(tick_interval.into());
self
}
pub fn handler_inactivity_timeout_secs<T: Into<HandlerInactivityTimeoutSecs>>(
mut self,
handler_inactivity_timeout_secs: T,
) -> Self {
self.handler_inactivity_timeout_secs = Some(handler_inactivity_timeout_secs.into());
self
}
pub fn partition_inactivity_timeout_secs<T: Into<PartitionInactivityTimeoutSecs>>(
mut self,
partition_inactivity_timeout_secs: T,
) -> Self {
self.partition_inactivity_timeout_secs = Some(partition_inactivity_timeout_secs.into());
self
}
pub fn stream_dead_policy<T: Into<StreamDeadPolicy>>(mut self, stream_dead_policy: T) -> Self {
self.stream_dead_policy = Some(stream_dead_policy.into());
self
}
pub fn warn_no_frames_secs<T: Into<WarnNoFramesSecs>>(
mut self,
warn_no_frames_secs: T,
) -> Self {
self.warn_no_frames_secs = Some(warn_no_frames_secs.into());
self
}
pub fn warn_no_events_secs<T: Into<WarnNoEventsSecs>>(
mut self,
warn_no_events_secs: T,
) -> Self {
self.warn_no_events_secs = Some(warn_no_events_secs.into());
self
}
pub fn dispatch_mode(mut self, dispatch_mode: DispatchMode) -> Self {
self.dispatch_mode = Some(dispatch_mode);
self
}
pub fn log_partition_events_mode(
mut self,
log_partition_events_mode: LogPartitionEventsMode,
) -> Self {
self.log_partition_events_mode = Some(log_partition_events_mode);
self
}
pub fn commit_config(mut self, connect_config: ConnectConfig) -> Self {
self.connect_config = connect_config;
self
}
pub fn connect_config(mut self, connect_config: ConnectConfig) -> Self {
self.connect_config = connect_config;
self
}
pub fn configure_connector<F>(self, mut f: F) -> Self
where
F: FnMut(ConnectConfig) -> ConnectConfig,
{
self.try_configure_connector(|config| Ok(f(config)))
.unwrap()
}
pub fn try_configure_connector<F>(mut self, mut f: F) -> Result<Self, Error>
where
F: FnMut(ConnectConfig) -> Result<ConnectConfig, Error>,
{
let connect_config = self.connect_config;
self.connect_config = f(connect_config)?;
Ok(self)
}
pub fn configure_committer<F>(self, mut f: F) -> Self
where
F: FnMut(CommitConfig) -> CommitConfig,
{
self.try_configure_committer(|config| Ok(f(config)))
.unwrap()
}
pub fn try_configure_committer<F>(mut self, mut f: F) -> Result<Self, Error>
where
F: FnMut(CommitConfig) -> Result<CommitConfig, Error>,
{
let commit_config = self.commit_config;
self.commit_config = f(commit_config)?;
Ok(self)
}
pub fn configure_stream_parameters<F>(self, mut f: F) -> Self
where
F: FnMut(StreamParameters) -> StreamParameters,
{
self.try_configure_stream_parameters(|params| Ok(f(params)))
.unwrap()
}
pub fn try_configure_stream_parameters<F>(mut self, f: F) -> Result<Self, Error>
where
F: FnMut(StreamParameters) -> Result<StreamParameters, Error>,
{
self.connect_config = self.connect_config.try_configure_stream_parameters(f)?;
Ok(self)
}
#[cfg(feature = "metrix")]
pub fn metrix(mut self, metrix: Metrix, detail: MetricsDetailLevel) -> Self {
self.instrumentation = Some(Instrumentation::metrix(metrix, detail));
self
}
#[cfg(feature = "metrix")]
pub fn metrix_mounted<A: AggregatesProcessors>(
mut self,
config: &MetrixConfig,
detail: MetricsDetailLevel,
processor: &mut A,
) -> Self {
let instr = Instrumentation::metrix_mounted(config, detail, processor);
self.instrumentation = Some(instr);
self
}
pub fn apply_defaults(&mut self) {
let instrumentation = self
.instrumentation
.clone()
.unwrap_or_else(Instrumentation::default);
let tick_interval = self.tick_interval_millis.unwrap_or_default().adjust();
let handler_inactivity_timeout = self.handler_inactivity_timeout_secs;
let partition_inactivity_timeout =
Some(self.partition_inactivity_timeout_secs.unwrap_or_default());
let stream_dead_policy = self.stream_dead_policy.unwrap_or_default();
let warn_no_frames_secs = Some(self.warn_no_frames_secs.unwrap_or_default());
let warn_no_events_secs = Some(self.warn_no_events_secs.unwrap_or_default());
let dispatch_mode = Some(self.dispatch_mode.unwrap_or_default());
let log_partition_events_mode = Some(self.log_partition_events_mode.unwrap_or_default());
self.connect_config.apply_defaults();
if self.commit_config.commit_strategy.is_none() {
self.commit_config.commit_strategy =
Some(CommitStrategy::derive_from_stream_parameters(
&self.connect_config.stream_parameters,
))
}
set_stream_commit_timeout(&mut self.commit_config, &self.connect_config);
self.commit_config.apply_defaults();
self.instrumentation = Some(instrumentation);
self.tick_interval_millis = Some(tick_interval);
self.handler_inactivity_timeout_secs = handler_inactivity_timeout;
self.partition_inactivity_timeout_secs = partition_inactivity_timeout;
self.stream_dead_policy = Some(stream_dead_policy);
self.warn_no_frames_secs = warn_no_frames_secs;
self.warn_no_events_secs = warn_no_events_secs;
self.dispatch_mode = dispatch_mode;
self.log_partition_events_mode = log_partition_events_mode;
}
pub fn build_with<C, HF, L>(
&self,
api_client: C,
handler_factory: HF,
logs: L,
) -> Result<Consumer, Error>
where
C: StreamingEssentials + Send + Sync + 'static + Clone,
HF: BatchHandlerFactory,
L: LoggingAdapter,
{
let config = self.config()?;
let inner = Inner {
config,
api_client,
handler_factory: Arc::new(handler_factory),
logging_adapter: Arc::new(logs),
lifecycle_listeners: LifecycleListeners::default(),
};
Ok(Consumer {
inner: Arc::new(inner),
})
}
pub fn build_with_tracker<C, HF, L>(
&self,
api_client: C,
handler_factory: HF,
logs: L,
) -> Result<Consumer, Error>
where
C: StreamingEssentials + SubscriptionApi + Send + Sync + 'static + Clone,
HF: BatchHandlerFactory,
L: LoggingAdapter + Clone,
{
let consumer = self.build_with(api_client.clone(), handler_factory, logs.clone())?;
if let Some(instrumentation) = self.instrumentation.clone() {
consumer.add_lifecycle_listener(
crate::tools::subscription_stats::SubscriptionStatsReporter::new(
api_client,
instrumentation,
logs,
),
);
}
Ok(consumer)
}
fn config(&self) -> Result<Config, Error> {
let subscription_id = mandatory(self.subscription_id, "subscription_id")?;
let instrumentation = self
.instrumentation
.clone()
.unwrap_or_else(Instrumentation::default);
let tick_interval = self.tick_interval_millis.unwrap_or_default().adjust();
let handler_inactivity_timeout = self.handler_inactivity_timeout_secs.unwrap_or_default();
let partition_inactivity_timeout =
self.partition_inactivity_timeout_secs.unwrap_or_default();
let stream_dead_policy = self.stream_dead_policy.unwrap_or_default();
stream_dead_policy.validate()?;
let warn_no_frames = self.warn_no_frames_secs.unwrap_or_default();
let warn_no_events = self.warn_no_events_secs.unwrap_or_default();
let dispatch_mode = self.dispatch_mode.clone().unwrap_or_default();
let log_partition_events_mode = self.log_partition_events_mode.unwrap_or_default();
let mut connect_config = self.connect_config.clone();
connect_config.apply_defaults();
let mut commit_config = self.commit_config.clone();
if let Some(commit_strategy) = commit_config.commit_strategy {
commit_strategy.validate()?;
} else {
commit_config.commit_strategy = Some(CommitStrategy::derive_from_stream_parameters(
&connect_config.stream_parameters,
));
}
set_stream_commit_timeout(&mut commit_config, &connect_config);
commit_config.apply_defaults();
let config = Config {
subscription_id,
instrumentation,
tick_interval,
handler_inactivity_timeout,
partition_inactivity_timeout,
stream_dead_policy,
warn_no_frames,
warn_no_events,
dispatch_mode,
log_partition_events_mode,
commit_config,
connect_config,
};
Ok(config)
}
}
fn set_stream_commit_timeout(commit_config: &mut CommitConfig, connect_config: &ConnectConfig) {
if commit_config.stream_commit_timeout_secs.is_none() {
let timeout = connect_config
.stream_parameters
.commit_timeout_secs
.unwrap_or_default();
commit_config.stream_commit_timeout_secs = Some(timeout);
}
}