Skip to main content

launchdarkly_server_sdk/
data_source_builders.rs

1use super::service_endpoints;
2use crate::data_source::{DataSource, NullDataSource, PollingDataSource, StreamingDataSource};
3use crate::feature_requester_builders::{FeatureRequesterFactory, HttpFeatureRequesterBuilder};
4use launchdarkly_sdk_transport::HttpTransport;
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7use thiserror::Error;
8
9#[cfg(test)]
10use super::data_source;
11
12/// Error type used to represent failures when building a DataSource instance.
13#[non_exhaustive]
14#[derive(Debug, Error)]
15pub enum BuildError {
16    /// Error used when a configuration setting is invalid. This typically indicates an invalid URL.
17    #[error("data source factory failed to build: {0}")]
18    InvalidConfig(String),
19}
20
21const DEFAULT_INITIAL_RECONNECT_DELAY: Duration = Duration::from_secs(1);
22const MINIMUM_POLL_INTERVAL: Duration = Duration::from_secs(30);
23
24/// Trait which allows creation of data sources. Should be implemented by data source builder types.
25pub trait DataSourceFactory {
26    fn build(
27        &self,
28        endpoints: &service_endpoints::ServiceEndpoints,
29        sdk_key: &str,
30        tags: Option<String>,
31    ) -> Result<Arc<dyn DataSource>, BuildError>;
32    fn to_owned(&self) -> Box<dyn DataSourceFactory>;
33}
34
35/// Contains methods for configuring the streaming data source.
36///
37/// By default, the SDK uses a streaming connection to receive feature flag data from LaunchDarkly. If you want
38/// to customize the behavior of the connection, create a builder [StreamingDataSourceBuilder::new],
39/// change its properties with the methods of this class, and pass it to
40/// [crate::ConfigBuilder::data_source].
41///
42/// # Examples
43///
44/// Adjust the initial reconnect delay.
45/// ```
46/// # use launchdarkly_server_sdk::{StreamingDataSourceBuilder, ConfigBuilder};
47/// # use launchdarkly_sdk_transport::HyperTransport;
48/// # use std::time::Duration;
49/// # fn main() {
50///     ConfigBuilder::new("sdk-key").data_source(StreamingDataSourceBuilder::<HyperTransport>::new()
51///         .initial_reconnect_delay(Duration::from_secs(10)));
52/// # }
53/// ```
54#[derive(Clone)]
55pub struct StreamingDataSourceBuilder<T: launchdarkly_sdk_transport::HttpTransport> {
56    initial_reconnect_delay: Duration,
57    transport: Option<T>,
58}
59
60impl<T: launchdarkly_sdk_transport::HttpTransport> StreamingDataSourceBuilder<T> {
61    /// Create a new instance of the [StreamingDataSourceBuilder] with default values.
62    pub fn new() -> Self {
63        Self {
64            initial_reconnect_delay: DEFAULT_INITIAL_RECONNECT_DELAY,
65            transport: None,
66        }
67    }
68
69    /// Sets the initial reconnect delay for the streaming connection.
70    pub fn initial_reconnect_delay(&mut self, duration: Duration) -> &mut Self {
71        self.initial_reconnect_delay = duration;
72        self
73    }
74
75    /// Sets the transport for the event source client to use. This allows for re-use of a
76    /// transport between multiple client instances. This is especially useful for the
77    /// `sdk-test-harness` where many client instances are created throughout the test and reading
78    /// the native certificates is a substantial portion of the runtime.
79    pub fn transport(&mut self, transport: T) -> &mut Self {
80        self.transport = Some(transport);
81        self
82    }
83}
84
85impl<T: launchdarkly_sdk_transport::HttpTransport> DataSourceFactory
86    for StreamingDataSourceBuilder<T>
87{
88    fn build(
89        &self,
90        endpoints: &service_endpoints::ServiceEndpoints,
91        sdk_key: &str,
92        tags: Option<String>,
93    ) -> Result<Arc<dyn DataSource>, BuildError> {
94        let data_source_result = match &self.transport {
95            #[cfg(any(
96                feature = "hyper-rustls-native-roots",
97                feature = "hyper-rustls-webpki-roots",
98                feature = "native-tls"
99            ))]
100            None => {
101                let transport =
102                    launchdarkly_sdk_transport::HyperTransport::new_https().map_err(|e| {
103                        BuildError::InvalidConfig(format!(
104                            "failed to create default https transport: {e:?}"
105                        ))
106                    })?;
107                Ok(StreamingDataSource::new(
108                    endpoints.streaming_base_url(),
109                    sdk_key,
110                    self.initial_reconnect_delay,
111                    &tags,
112                    transport,
113                ))
114            }
115            #[cfg(not(any(
116                feature = "hyper-rustls-native-roots",
117                feature = "hyper-rustls-webpki-roots",
118                feature = "native-tls"
119            )))]
120            None => Err(BuildError::InvalidConfig(
121                "https connector required when hyper-rustls-native-roots, hyper-rustls-webpki-roots, or native-tls features are disabled".into(),
122            )),
123            Some(transport) => Ok(StreamingDataSource::new(
124                endpoints.streaming_base_url(),
125                sdk_key,
126                self.initial_reconnect_delay,
127                &tags,
128                transport.clone(),
129            )),
130        };
131        let data_source = data_source_result?
132            .map_err(|e| BuildError::InvalidConfig(format!("invalid stream_base_url: {e:?}")))?;
133        Ok(Arc::new(data_source))
134    }
135
136    fn to_owned(&self) -> Box<dyn DataSourceFactory> {
137        Box::new(self.clone())
138    }
139}
140
141impl<T: launchdarkly_sdk_transport::HttpTransport> Default for StreamingDataSourceBuilder<T> {
142    fn default() -> Self {
143        StreamingDataSourceBuilder::new()
144    }
145}
146
147#[derive(Clone)]
148pub struct NullDataSourceBuilder {}
149
150impl NullDataSourceBuilder {
151    pub fn new() -> Self {
152        Self {}
153    }
154}
155
156impl DataSourceFactory for NullDataSourceBuilder {
157    fn build(
158        &self,
159        _: &service_endpoints::ServiceEndpoints,
160        _: &str,
161        _: Option<String>,
162    ) -> Result<Arc<dyn DataSource>, BuildError> {
163        Ok(Arc::new(NullDataSource::new()))
164    }
165
166    fn to_owned(&self) -> Box<dyn DataSourceFactory> {
167        Box::new(self.clone())
168    }
169}
170
171impl Default for NullDataSourceBuilder {
172    fn default() -> Self {
173        NullDataSourceBuilder::new()
174    }
175}
176
177/// Contains methods for configuring the polling data source.
178///
179/// Polling is not the default behavior; by default, the SDK uses a streaming connection to receive feature flag
180/// data from LaunchDarkly. In polling mode, the SDK instead makes a new HTTP request to LaunchDarkly at regular
181/// intervals. HTTP caching allows it to avoid redundantly downloading data if there have been no changes, but
182/// polling is still less efficient than streaming and should only be used on the advice of LaunchDarkly support.
183///
184/// To use polling mode, create a builder [PollingDataSourceBuilder::new], change its properties
185/// with the methods of this class, and pass it to the [crate::ConfigBuilder::data_source].
186///
187/// # Examples
188///
189/// Adjust the initial reconnect delay.
190/// ```
191/// # use launchdarkly_server_sdk::{PollingDataSourceBuilder, ConfigBuilder};
192/// # use launchdarkly_sdk_transport::HyperTransport;
193/// # use std::time::Duration;
194/// # fn main() {
195///     ConfigBuilder::new("sdk-key").data_source(PollingDataSourceBuilder::<HyperTransport>::new()
196///         .poll_interval(Duration::from_secs(60)));
197/// # }
198/// ```
199#[derive(Clone)]
200pub struct PollingDataSourceBuilder<T: HttpTransport = launchdarkly_sdk_transport::HyperTransport> {
201    poll_interval: Duration,
202    transport: Option<T>,
203}
204
205/// Contains methods for configuring the polling data source.
206///
207/// Polling is not the default behavior; by default, the SDK uses a streaming connection to receive
208/// feature flag data from LaunchDarkly. In polling mode, the SDK instead makes a new HTTP request
209/// to LaunchDarkly at regular intervals. HTTP caching allows it to avoid redundantly downloading
210/// data if there have been no changes, but polling is still less efficient than streaming and
211/// should only be used on the advice of LaunchDarkly support.
212///
213/// To use polling mode, create a builder with [PollingDataSourceBuilder::new], set its properties
214/// with the methods of this class, and pass it to [crate::ConfigBuilder::data_source].
215///
216/// # Examples
217///
218/// Adjust the poll interval.
219/// ```
220/// # use launchdarkly_server_sdk::{PollingDataSourceBuilder, ConfigBuilder};
221/// # use launchdarkly_sdk_transport::HyperTransport;
222/// # use std::time::Duration;
223/// # fn main() {
224///     ConfigBuilder::new("sdk-key").data_source(PollingDataSourceBuilder::<HyperTransport>::new()
225///         .poll_interval(Duration::from_secs(60)));
226/// # }
227/// ```
228impl<T: HttpTransport> PollingDataSourceBuilder<T> {
229    /// Create a new instance of the [PollingDataSourceBuilder] with default values.
230    pub fn new() -> Self {
231        Self {
232            poll_interval: MINIMUM_POLL_INTERVAL,
233            transport: None,
234        }
235    }
236
237    /// Sets the poll interval for the polling connection.
238    ///
239    /// The default and minimum value is 30 seconds. Values less than this will be set to the
240    /// default.
241    pub fn poll_interval(&mut self, poll_interval: Duration) -> &mut Self {
242        self.poll_interval = std::cmp::max(poll_interval, MINIMUM_POLL_INTERVAL);
243        self
244    }
245
246    /// Sets the transport for the polling client to use. This allows for re-use of a transport
247    /// between multiple client instances. This is especially useful for the `sdk-test-harness`
248    /// where many client instances are created throughout the test and reading the native
249    /// certificates is a substantial portion of the runtime.
250    pub fn transport(&mut self, transport: T) -> &mut Self {
251        self.transport = Some(transport);
252        self
253    }
254}
255
256impl<T: HttpTransport> DataSourceFactory for PollingDataSourceBuilder<T> {
257    fn build(
258        &self,
259        endpoints: &service_endpoints::ServiceEndpoints,
260        sdk_key: &str,
261        tags: Option<String>,
262    ) -> Result<Arc<dyn DataSource>, BuildError> {
263        let feature_requester_builder: Result<Box<dyn FeatureRequesterFactory>, BuildError> =
264            match &self.transport {
265                #[cfg(any(
266                    feature = "hyper-rustls-native-roots",
267                    feature = "hyper-rustls-webpki-roots",
268                    feature = "native-tls"
269                ))]
270                None => {
271                    let transport = launchdarkly_sdk_transport::HyperTransport::new_https()
272                        .map_err(|e| {
273                            BuildError::InvalidConfig(format!(
274                                "failed to create default https transport: {e:?}"
275                            ))
276                        })?;
277
278                    Ok(Box::new(HttpFeatureRequesterBuilder::new(
279                        endpoints.polling_base_url(),
280                        sdk_key,
281                        transport,
282                    )))
283                }
284                #[cfg(not(any(
285                    feature = "hyper-rustls-native-roots",
286                    feature = "hyper-rustls-webpki-roots",
287                    feature = "native-tls"
288                )))]
289                None => Err(BuildError::InvalidConfig(
290                    "transport is required when hyper-rustls-native-roots, hyper-rustls-webpki-roots, or native-tls features are disabled".into(),
291                )),
292                Some(transport) => Ok(Box::new(HttpFeatureRequesterBuilder::new(
293                    endpoints.polling_base_url(),
294                    sdk_key,
295                    transport.clone(),
296                ))),
297            };
298
299        let feature_requester_factory: Arc<Mutex<Box<dyn FeatureRequesterFactory>>> =
300            Arc::new(Mutex::new(feature_requester_builder?));
301
302        let data_source =
303            PollingDataSource::new(feature_requester_factory, self.poll_interval, tags);
304        Ok(Arc::new(data_source))
305    }
306
307    fn to_owned(&self) -> Box<dyn DataSourceFactory> {
308        Box::new(self.clone())
309    }
310}
311
312impl<T: HttpTransport> Default for PollingDataSourceBuilder<T> {
313    fn default() -> Self {
314        PollingDataSourceBuilder::new()
315    }
316}
317
318/// For testing you can use this builder to inject the MockDataSource.
319#[cfg(test)]
320#[derive(Clone)]
321pub(crate) struct MockDataSourceBuilder {
322    data_source: Option<Arc<data_source::MockDataSource>>,
323}
324
325#[cfg(test)]
326impl MockDataSourceBuilder {
327    pub fn new() -> MockDataSourceBuilder {
328        MockDataSourceBuilder { data_source: None }
329    }
330
331    pub fn data_source(
332        &mut self,
333        data_source: Arc<data_source::MockDataSource>,
334    ) -> &mut MockDataSourceBuilder {
335        self.data_source = Some(data_source);
336        self
337    }
338}
339
340#[cfg(test)]
341impl DataSourceFactory for MockDataSourceBuilder {
342    fn build(
343        &self,
344        _endpoints: &service_endpoints::ServiceEndpoints,
345        _sdk_key: &str,
346        _tags: Option<String>,
347    ) -> Result<Arc<dyn DataSource>, BuildError> {
348        Ok(self.data_source.as_ref().unwrap().clone())
349    }
350
351    fn to_owned(&self) -> Box<dyn DataSourceFactory> {
352        Box::new(self.clone())
353    }
354}
355
356#[cfg(test)]
357mod tests {
358    use bytes::Bytes;
359    use launchdarkly_sdk_transport::{HyperTransport, Request, ResponseFuture};
360
361    use super::*;
362
363    #[test]
364    fn default_stream_builder_has_correct_defaults() {
365        let builder: StreamingDataSourceBuilder<launchdarkly_sdk_transport::HyperTransport> =
366            StreamingDataSourceBuilder::new();
367
368        assert_eq!(
369            builder.initial_reconnect_delay,
370            DEFAULT_INITIAL_RECONNECT_DELAY
371        );
372    }
373
374    #[test]
375    fn stream_builder_can_use_custom_transport() {
376        #[derive(Debug, Clone)]
377        struct TestTransport;
378
379        impl launchdarkly_sdk_transport::HttpTransport for TestTransport {
380            fn request(&self, _request: Request<Option<Bytes>>) -> ResponseFuture {
381                // this won't be called during the test
382                unreachable!();
383            }
384        }
385
386        let mut builder = StreamingDataSourceBuilder::new();
387        builder.transport(TestTransport);
388        assert!(builder
389            .build(
390                &crate::ServiceEndpointsBuilder::new().build().unwrap(),
391                "test",
392                None
393            )
394            .is_ok());
395    }
396
397    #[test]
398    fn default_polling_builder_has_correct_defaults() {
399        let builder = PollingDataSourceBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
400        assert_eq!(builder.poll_interval, MINIMUM_POLL_INTERVAL,);
401    }
402
403    #[test]
404    fn initial_reconnect_delay_for_streaming_can_be_adjusted() {
405        let mut builder = StreamingDataSourceBuilder::<HyperTransport>::new();
406        builder.initial_reconnect_delay(Duration::from_secs(1234));
407        assert_eq!(builder.initial_reconnect_delay, Duration::from_secs(1234));
408    }
409}