use super::service_endpoints;
use crate::data_source::{DataSource, NullDataSource, PollingDataSource, StreamingDataSource};
use crate::feature_requester_builders::{FeatureRequesterFactory, HyperFeatureRequesterBuilder};
use eventsource_client::HttpsConnector;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use thiserror::Error;
#[cfg(test)]
use super::data_source;
#[non_exhaustive]
#[derive(Debug, Error)]
pub enum BuildError {
#[error("data source factory failed to build: {0}")]
InvalidConfig(String),
}
const DEFAULT_INITIAL_RECONNECT_DELAY: Duration = Duration::from_secs(1);
const MINIMUM_POLL_INTERVAL: Duration = Duration::from_secs(30);
pub trait DataSourceFactory {
fn build(
&self,
endpoints: &service_endpoints::ServiceEndpoints,
sdk_key: &str,
tags: Option<String>,
) -> Result<Arc<dyn DataSource>, BuildError>;
fn to_owned(&self) -> Box<dyn DataSourceFactory>;
}
#[derive(Clone)]
pub struct StreamingDataSourceBuilder {
initial_reconnect_delay: Duration,
connector: Option<HttpsConnector>,
}
impl StreamingDataSourceBuilder {
pub fn new() -> Self {
Self {
initial_reconnect_delay: DEFAULT_INITIAL_RECONNECT_DELAY,
connector: None,
}
}
pub fn initial_reconnect_delay(&mut self, duration: Duration) -> &mut Self {
self.initial_reconnect_delay = duration;
self
}
pub fn https_connector(&mut self, connector: HttpsConnector) -> &mut Self {
self.connector = Some(connector);
self
}
}
impl DataSourceFactory for StreamingDataSourceBuilder {
fn build(
&self,
endpoints: &service_endpoints::ServiceEndpoints,
sdk_key: &str,
tags: Option<String>,
) -> Result<Arc<dyn DataSource>, BuildError> {
let data_source = match &self.connector {
None => StreamingDataSource::new(
endpoints.streaming_base_url(),
sdk_key,
self.initial_reconnect_delay,
&tags,
),
Some(connector) => StreamingDataSource::new_with_connector(
endpoints.streaming_base_url(),
sdk_key,
self.initial_reconnect_delay,
&tags,
connector.clone(),
),
}
.map_err(|e| BuildError::InvalidConfig(format!("invalid stream_base_url: {:?}", e)))?;
Ok(Arc::new(data_source))
}
fn to_owned(&self) -> Box<dyn DataSourceFactory> {
Box::new(self.clone())
}
}
impl Default for StreamingDataSourceBuilder {
fn default() -> Self {
StreamingDataSourceBuilder::new()
}
}
#[derive(Clone)]
pub struct NullDataSourceBuilder {}
impl NullDataSourceBuilder {
pub fn new() -> Self {
Self {}
}
}
impl DataSourceFactory for NullDataSourceBuilder {
fn build(
&self,
_: &service_endpoints::ServiceEndpoints,
_: &str,
_: Option<String>,
) -> Result<Arc<dyn DataSource>, BuildError> {
Ok(Arc::new(NullDataSource::new()))
}
fn to_owned(&self) -> Box<dyn DataSourceFactory> {
Box::new(self.clone())
}
}
impl Default for NullDataSourceBuilder {
fn default() -> Self {
NullDataSourceBuilder::new()
}
}
#[derive(Clone)]
pub struct PollingDataSourceBuilder {
poll_interval: Duration,
feature_requester_factory: Option<Arc<Mutex<Box<dyn FeatureRequesterFactory>>>>,
}
impl PollingDataSourceBuilder {
pub fn new() -> Self {
Self {
poll_interval: MINIMUM_POLL_INTERVAL,
feature_requester_factory: None,
}
}
pub fn poll_interval(&mut self, poll_interval: Duration) -> &mut Self {
self.poll_interval = std::cmp::max(poll_interval, MINIMUM_POLL_INTERVAL);
self
}
pub fn feature_requester_factory(
&mut self,
feature_requester_factory: Box<dyn FeatureRequesterFactory>,
) -> &mut Self {
self.feature_requester_factory = Some(Arc::new(Mutex::new(feature_requester_factory)));
self
}
}
impl DataSourceFactory for PollingDataSourceBuilder {
fn build(
&self,
endpoints: &service_endpoints::ServiceEndpoints,
sdk_key: &str,
tags: Option<String>,
) -> Result<Arc<dyn DataSource>, BuildError> {
let feature_requester_factory: Arc<Mutex<Box<dyn FeatureRequesterFactory>>> =
match &self.feature_requester_factory {
Some(factory) => factory.clone(),
_ => Arc::new(Mutex::new(Box::new(HyperFeatureRequesterBuilder::new(
endpoints.polling_base_url(),
sdk_key,
)))),
};
let data_source =
PollingDataSource::new(feature_requester_factory, self.poll_interval, tags);
Ok(Arc::new(data_source))
}
fn to_owned(&self) -> Box<dyn DataSourceFactory> {
Box::new(self.clone())
}
}
impl Default for PollingDataSourceBuilder {
fn default() -> Self {
PollingDataSourceBuilder::new()
}
}
#[cfg(test)]
#[derive(Clone)]
pub(crate) struct MockDataSourceBuilder {
data_source: Option<Arc<data_source::MockDataSource>>,
}
#[cfg(test)]
impl MockDataSourceBuilder {
pub fn new() -> MockDataSourceBuilder {
MockDataSourceBuilder { data_source: None }
}
pub fn data_source(
&mut self,
data_source: Arc<data_source::MockDataSource>,
) -> &mut MockDataSourceBuilder {
self.data_source = Some(data_source);
self
}
}
#[cfg(test)]
impl DataSourceFactory for MockDataSourceBuilder {
fn build(
&self,
_endpoints: &service_endpoints::ServiceEndpoints,
_sdk_key: &str,
_tags: Option<String>,
) -> Result<Arc<dyn DataSource>, BuildError> {
return Ok(self.data_source.as_ref().unwrap().clone());
}
fn to_owned(&self) -> Box<dyn DataSourceFactory> {
Box::new(self.clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_stream_builder_has_correct_defaults() {
let builder = StreamingDataSourceBuilder::new();
assert_eq!(
builder.initial_reconnect_delay,
DEFAULT_INITIAL_RECONNECT_DELAY
);
}
#[test]
fn default_polling_builder_has_correct_defaults() {
let builder = PollingDataSourceBuilder::new();
assert_eq!(builder.poll_interval, MINIMUM_POLL_INTERVAL,);
}
#[test]
fn initial_reconnect_delay_for_streaming_can_be_adjusted() {
let mut builder = StreamingDataSourceBuilder::new();
builder.initial_reconnect_delay(Duration::from_secs(1234));
assert_eq!(builder.initial_reconnect_delay, Duration::from_secs(1234));
}
}