Skip to main content

launchdarkly_server_sdk/
data_source_builders.rs

1use super::service_endpoints;
2use crate::data_source::{DataSource, NullDataSource, PollingDataSource, StreamingDataSource};
3use crate::feature_requester_builders::{FeatureRequesterFactory, HttpFeatureRequesterBuilder};
4use launchdarkly_sdk_transport::HttpTransport;
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7use thiserror::Error;
8
9#[cfg(test)]
10use super::data_source;
11
12/// Error type used to represent failures when building a DataSource instance.
13#[non_exhaustive]
14#[derive(Debug, Error)]
15pub enum BuildError {
16    /// Error used when a configuration setting is invalid. This typically indicates an invalid URL.
17    #[error("data source factory failed to build: {0}")]
18    InvalidConfig(String),
19}
20
21const DEFAULT_INITIAL_RECONNECT_DELAY: Duration = Duration::from_secs(1);
22const MINIMUM_POLL_INTERVAL: Duration = Duration::from_secs(30);
23
24/// Trait which allows creation of data sources. Should be implemented by data source builder types.
25pub trait DataSourceFactory {
26    fn build(
27        &self,
28        endpoints: &service_endpoints::ServiceEndpoints,
29        sdk_key: &str,
30        tags: Option<String>,
31    ) -> Result<Arc<dyn DataSource>, BuildError>;
32
33    /// Sets the per-SDK-instance identifier used to populate the
34    /// `X-LaunchDarkly-Instance-Id` header on outbound requests. LD-owned builders override
35    /// this to stamp the header on streaming, polling, and event requests. External
36    /// implementors of this trait may ignore this — the default no-op is correct unless the
37    /// implementor constructs HTTP clients that talk to LaunchDarkly's API.
38    fn set_instance_id(&mut self, _instance_id: String) {}
39
40    fn to_owned(&self) -> Box<dyn DataSourceFactory>;
41}
42
43/// Contains methods for configuring the streaming data source.
44///
45/// By default, the SDK uses a streaming connection to receive feature flag data from LaunchDarkly. If you want
46/// to customize the behavior of the connection, create a builder [StreamingDataSourceBuilder::new],
47/// change its properties with the methods of this class, and pass it to
48/// [crate::ConfigBuilder::data_source].
49///
50/// # Examples
51///
52/// Adjust the initial reconnect delay.
53/// ```
54/// # use launchdarkly_server_sdk::{StreamingDataSourceBuilder, ConfigBuilder};
55/// # use launchdarkly_sdk_transport::HyperTransport;
56/// # use std::time::Duration;
57/// # fn main() {
58///     ConfigBuilder::new("sdk-key").data_source(StreamingDataSourceBuilder::<HyperTransport>::new()
59///         .initial_reconnect_delay(Duration::from_secs(10)));
60/// # }
61/// ```
62#[derive(Clone)]
63pub struct StreamingDataSourceBuilder<T: launchdarkly_sdk_transport::HttpTransport> {
64    initial_reconnect_delay: Duration,
65    transport: Option<T>,
66    instance_id: Option<String>,
67}
68
69impl<T: launchdarkly_sdk_transport::HttpTransport> StreamingDataSourceBuilder<T> {
70    /// Create a new instance of the [StreamingDataSourceBuilder] with default values.
71    pub fn new() -> Self {
72        Self {
73            initial_reconnect_delay: DEFAULT_INITIAL_RECONNECT_DELAY,
74            transport: None,
75            instance_id: None,
76        }
77    }
78
79    /// Sets the initial reconnect delay for the streaming connection.
80    pub fn initial_reconnect_delay(&mut self, duration: Duration) -> &mut Self {
81        self.initial_reconnect_delay = duration;
82        self
83    }
84
85    /// Sets the transport for the event source client to use. This allows for re-use of a
86    /// transport between multiple client instances. This is especially useful for the
87    /// `sdk-test-harness` where many client instances are created throughout the test and reading
88    /// the native certificates is a substantial portion of the runtime.
89    pub fn transport(&mut self, transport: T) -> &mut Self {
90        self.transport = Some(transport);
91        self
92    }
93}
94
95impl<T: launchdarkly_sdk_transport::HttpTransport> DataSourceFactory
96    for StreamingDataSourceBuilder<T>
97{
98    fn build(
99        &self,
100        endpoints: &service_endpoints::ServiceEndpoints,
101        sdk_key: &str,
102        tags: Option<String>,
103    ) -> Result<Arc<dyn DataSource>, BuildError> {
104        let instance_id = self.instance_id.as_deref();
105        let data_source_result = match &self.transport {
106            #[cfg(any(
107                feature = "hyper-rustls-native-roots",
108                feature = "hyper-rustls-webpki-roots",
109                feature = "native-tls"
110            ))]
111            None => {
112                let transport =
113                    launchdarkly_sdk_transport::HyperTransport::new_https().map_err(|e| {
114                        BuildError::InvalidConfig(format!(
115                            "failed to create default https transport: {e:?}"
116                        ))
117                    })?;
118                Ok(StreamingDataSource::new(
119                    endpoints.streaming_base_url(),
120                    sdk_key,
121                    self.initial_reconnect_delay,
122                    &tags,
123                    instance_id,
124                    transport,
125                ))
126            }
127            #[cfg(not(any(
128                feature = "hyper-rustls-native-roots",
129                feature = "hyper-rustls-webpki-roots",
130                feature = "native-tls"
131            )))]
132            None => Err(BuildError::InvalidConfig(
133                "https connector required when hyper-rustls-native-roots, hyper-rustls-webpki-roots, or native-tls features are disabled".into(),
134            )),
135            Some(transport) => Ok(StreamingDataSource::new(
136                endpoints.streaming_base_url(),
137                sdk_key,
138                self.initial_reconnect_delay,
139                &tags,
140                instance_id,
141                transport.clone(),
142            )),
143        };
144        let data_source = data_source_result?
145            .map_err(|e| BuildError::InvalidConfig(format!("invalid stream_base_url: {e:?}")))?;
146        Ok(Arc::new(data_source))
147    }
148
149    fn set_instance_id(&mut self, instance_id: String) {
150        self.instance_id = Some(instance_id);
151    }
152
153    fn to_owned(&self) -> Box<dyn DataSourceFactory> {
154        Box::new(self.clone())
155    }
156}
157
158impl<T: launchdarkly_sdk_transport::HttpTransport> Default for StreamingDataSourceBuilder<T> {
159    fn default() -> Self {
160        StreamingDataSourceBuilder::new()
161    }
162}
163
164#[derive(Clone)]
165pub struct NullDataSourceBuilder {}
166
167impl NullDataSourceBuilder {
168    pub fn new() -> Self {
169        Self {}
170    }
171}
172
173impl DataSourceFactory for NullDataSourceBuilder {
174    fn build(
175        &self,
176        _: &service_endpoints::ServiceEndpoints,
177        _: &str,
178        _: Option<String>,
179    ) -> Result<Arc<dyn DataSource>, BuildError> {
180        Ok(Arc::new(NullDataSource::new()))
181    }
182
183    fn to_owned(&self) -> Box<dyn DataSourceFactory> {
184        Box::new(self.clone())
185    }
186}
187
188impl Default for NullDataSourceBuilder {
189    fn default() -> Self {
190        NullDataSourceBuilder::new()
191    }
192}
193
194/// Contains methods for configuring the polling data source.
195///
196/// Polling is not the default behavior; by default, the SDK uses a streaming connection to receive feature flag
197/// data from LaunchDarkly. In polling mode, the SDK instead makes a new HTTP request to LaunchDarkly at regular
198/// intervals. HTTP caching allows it to avoid redundantly downloading data if there have been no changes, but
199/// polling is still less efficient than streaming and should only be used on the advice of LaunchDarkly support.
200///
201/// To use polling mode, create a builder [PollingDataSourceBuilder::new], change its properties
202/// with the methods of this class, and pass it to the [crate::ConfigBuilder::data_source].
203///
204/// # Examples
205///
206/// Adjust the initial reconnect delay.
207/// ```
208/// # use launchdarkly_server_sdk::{PollingDataSourceBuilder, ConfigBuilder};
209/// # use launchdarkly_sdk_transport::HyperTransport;
210/// # use std::time::Duration;
211/// # fn main() {
212///     ConfigBuilder::new("sdk-key").data_source(PollingDataSourceBuilder::<HyperTransport>::new()
213///         .poll_interval(Duration::from_secs(60)));
214/// # }
215/// ```
216#[derive(Clone)]
217pub struct PollingDataSourceBuilder<T: HttpTransport = launchdarkly_sdk_transport::HyperTransport> {
218    poll_interval: Duration,
219    transport: Option<T>,
220    instance_id: Option<String>,
221}
222
223/// Contains methods for configuring the polling data source.
224///
225/// Polling is not the default behavior; by default, the SDK uses a streaming connection to receive
226/// feature flag data from LaunchDarkly. In polling mode, the SDK instead makes a new HTTP request
227/// to LaunchDarkly at regular intervals. HTTP caching allows it to avoid redundantly downloading
228/// data if there have been no changes, but polling is still less efficient than streaming and
229/// should only be used on the advice of LaunchDarkly support.
230///
231/// To use polling mode, create a builder with [PollingDataSourceBuilder::new], set its properties
232/// with the methods of this class, and pass it to [crate::ConfigBuilder::data_source].
233///
234/// # Examples
235///
236/// Adjust the poll interval.
237/// ```
238/// # use launchdarkly_server_sdk::{PollingDataSourceBuilder, ConfigBuilder};
239/// # use launchdarkly_sdk_transport::HyperTransport;
240/// # use std::time::Duration;
241/// # fn main() {
242///     ConfigBuilder::new("sdk-key").data_source(PollingDataSourceBuilder::<HyperTransport>::new()
243///         .poll_interval(Duration::from_secs(60)));
244/// # }
245/// ```
246impl<T: HttpTransport> PollingDataSourceBuilder<T> {
247    /// Create a new instance of the [PollingDataSourceBuilder] with default values.
248    pub fn new() -> Self {
249        Self {
250            poll_interval: MINIMUM_POLL_INTERVAL,
251            transport: None,
252            instance_id: None,
253        }
254    }
255
256    /// Sets the poll interval for the polling connection.
257    ///
258    /// The default and minimum value is 30 seconds. Values less than this will be set to the
259    /// default.
260    pub fn poll_interval(&mut self, poll_interval: Duration) -> &mut Self {
261        self.poll_interval = std::cmp::max(poll_interval, MINIMUM_POLL_INTERVAL);
262        self
263    }
264
265    /// Sets the transport for the polling client to use. This allows for re-use of a transport
266    /// between multiple client instances. This is especially useful for the `sdk-test-harness`
267    /// where many client instances are created throughout the test and reading the native
268    /// certificates is a substantial portion of the runtime.
269    pub fn transport(&mut self, transport: T) -> &mut Self {
270        self.transport = Some(transport);
271        self
272    }
273}
274
275impl<T: HttpTransport> DataSourceFactory for PollingDataSourceBuilder<T> {
276    fn build(
277        &self,
278        endpoints: &service_endpoints::ServiceEndpoints,
279        sdk_key: &str,
280        tags: Option<String>,
281    ) -> Result<Arc<dyn DataSource>, BuildError> {
282        let instance_id = self.instance_id.as_deref();
283        let feature_requester_builder: Result<Box<dyn FeatureRequesterFactory>, BuildError> =
284            match &self.transport {
285                #[cfg(any(
286                    feature = "hyper-rustls-native-roots",
287                    feature = "hyper-rustls-webpki-roots",
288                    feature = "native-tls"
289                ))]
290                None => {
291                    let transport = launchdarkly_sdk_transport::HyperTransport::new_https()
292                        .map_err(|e| {
293                            BuildError::InvalidConfig(format!(
294                                "failed to create default https transport: {e:?}"
295                            ))
296                        })?;
297                    let mut builder = HttpFeatureRequesterBuilder::new(
298                        endpoints.polling_base_url(),
299                        sdk_key,
300                        transport,
301                    );
302                    if let Some(instance_id) = instance_id {
303                        builder = builder.with_instance_id(instance_id);
304                    }
305                    Ok(Box::new(builder))
306                }
307                #[cfg(not(any(
308                    feature = "hyper-rustls-native-roots",
309                    feature = "hyper-rustls-webpki-roots",
310                    feature = "native-tls"
311                )))]
312                None => Err(BuildError::InvalidConfig(
313                    "transport is required when hyper-rustls-native-roots, hyper-rustls-webpki-roots, or native-tls features are disabled".into(),
314                )),
315                Some(transport) => {
316                    let mut builder = HttpFeatureRequesterBuilder::new(
317                        endpoints.polling_base_url(),
318                        sdk_key,
319                        transport.clone(),
320                    );
321                    if let Some(instance_id) = instance_id {
322                        builder = builder.with_instance_id(instance_id);
323                    }
324                    Ok(Box::new(builder))
325                }
326            };
327
328        let feature_requester_factory: Arc<Mutex<Box<dyn FeatureRequesterFactory>>> =
329            Arc::new(Mutex::new(feature_requester_builder?));
330
331        let data_source =
332            PollingDataSource::new(feature_requester_factory, self.poll_interval, tags);
333        Ok(Arc::new(data_source))
334    }
335
336    fn set_instance_id(&mut self, instance_id: String) {
337        self.instance_id = Some(instance_id);
338    }
339
340    fn to_owned(&self) -> Box<dyn DataSourceFactory> {
341        Box::new(self.clone())
342    }
343}
344
345impl<T: HttpTransport> Default for PollingDataSourceBuilder<T> {
346    fn default() -> Self {
347        PollingDataSourceBuilder::new()
348    }
349}
350
351/// For testing you can use this builder to inject the MockDataSource.
352#[cfg(test)]
353#[derive(Clone)]
354pub(crate) struct MockDataSourceBuilder {
355    data_source: Option<Arc<data_source::MockDataSource>>,
356}
357
358#[cfg(test)]
359impl MockDataSourceBuilder {
360    pub fn new() -> MockDataSourceBuilder {
361        MockDataSourceBuilder { data_source: None }
362    }
363
364    pub fn data_source(
365        &mut self,
366        data_source: Arc<data_source::MockDataSource>,
367    ) -> &mut MockDataSourceBuilder {
368        self.data_source = Some(data_source);
369        self
370    }
371}
372
373#[cfg(test)]
374impl DataSourceFactory for MockDataSourceBuilder {
375    fn build(
376        &self,
377        _endpoints: &service_endpoints::ServiceEndpoints,
378        _sdk_key: &str,
379        _tags: Option<String>,
380    ) -> Result<Arc<dyn DataSource>, BuildError> {
381        Ok(self.data_source.as_ref().unwrap().clone())
382    }
383
384    fn to_owned(&self) -> Box<dyn DataSourceFactory> {
385        Box::new(self.clone())
386    }
387}
388
389#[cfg(test)]
390mod tests {
391    use bytes::Bytes;
392    use launchdarkly_sdk_transport::{HyperTransport, Request, ResponseFuture};
393
394    use super::*;
395
396    #[test]
397    fn default_stream_builder_has_correct_defaults() {
398        let builder: StreamingDataSourceBuilder<launchdarkly_sdk_transport::HyperTransport> =
399            StreamingDataSourceBuilder::new();
400
401        assert_eq!(
402            builder.initial_reconnect_delay,
403            DEFAULT_INITIAL_RECONNECT_DELAY
404        );
405    }
406
407    #[test]
408    fn stream_builder_can_use_custom_transport() {
409        #[derive(Debug, Clone)]
410        struct TestTransport;
411
412        impl launchdarkly_sdk_transport::HttpTransport for TestTransport {
413            fn request(&self, _request: Request<Option<Bytes>>) -> ResponseFuture {
414                // this won't be called during the test
415                unreachable!();
416            }
417        }
418
419        let mut builder = StreamingDataSourceBuilder::new();
420        builder.transport(TestTransport);
421        assert!(builder
422            .build(
423                &crate::ServiceEndpointsBuilder::new().build().unwrap(),
424                "test",
425                None,
426            )
427            .is_ok());
428    }
429
430    #[test]
431    fn default_polling_builder_has_correct_defaults() {
432        let builder = PollingDataSourceBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
433        assert_eq!(builder.poll_interval, MINIMUM_POLL_INTERVAL,);
434    }
435
436    #[test]
437    fn initial_reconnect_delay_for_streaming_can_be_adjusted() {
438        let mut builder = StreamingDataSourceBuilder::<HyperTransport>::new();
439        builder.initial_reconnect_delay(Duration::from_secs(1234));
440        assert_eq!(builder.initial_reconnect_delay, Duration::from_secs(1234));
441    }
442}