1use super::service_endpoints;
2use crate::data_source::{DataSource, NullDataSource, PollingDataSource, StreamingDataSource};
3use crate::feature_requester_builders::{FeatureRequesterFactory, HttpFeatureRequesterBuilder};
4use launchdarkly_sdk_transport::HttpTransport;
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7use thiserror::Error;
8
9#[cfg(test)]
10use super::data_source;
11
12#[non_exhaustive]
14#[derive(Debug, Error)]
15pub enum BuildError {
16 #[error("data source factory failed to build: {0}")]
18 InvalidConfig(String),
19}
20
21const DEFAULT_INITIAL_RECONNECT_DELAY: Duration = Duration::from_secs(1);
22const MINIMUM_POLL_INTERVAL: Duration = Duration::from_secs(30);
23
24pub trait DataSourceFactory {
26 fn build(
27 &self,
28 endpoints: &service_endpoints::ServiceEndpoints,
29 sdk_key: &str,
30 tags: Option<String>,
31 ) -> Result<Arc<dyn DataSource>, BuildError>;
32 fn to_owned(&self) -> Box<dyn DataSourceFactory>;
33}
34
35#[derive(Clone)]
55pub struct StreamingDataSourceBuilder<T: launchdarkly_sdk_transport::HttpTransport> {
56 initial_reconnect_delay: Duration,
57 transport: Option<T>,
58}
59
60impl<T: launchdarkly_sdk_transport::HttpTransport> StreamingDataSourceBuilder<T> {
61 pub fn new() -> Self {
63 Self {
64 initial_reconnect_delay: DEFAULT_INITIAL_RECONNECT_DELAY,
65 transport: None,
66 }
67 }
68
69 pub fn initial_reconnect_delay(&mut self, duration: Duration) -> &mut Self {
71 self.initial_reconnect_delay = duration;
72 self
73 }
74
75 pub fn transport(&mut self, transport: T) -> &mut Self {
80 self.transport = Some(transport);
81 self
82 }
83}
84
85impl<T: launchdarkly_sdk_transport::HttpTransport> DataSourceFactory
86 for StreamingDataSourceBuilder<T>
87{
88 fn build(
89 &self,
90 endpoints: &service_endpoints::ServiceEndpoints,
91 sdk_key: &str,
92 tags: Option<String>,
93 ) -> Result<Arc<dyn DataSource>, BuildError> {
94 let data_source_result = match &self.transport {
95 #[cfg(any(
96 feature = "hyper-rustls-native-roots",
97 feature = "hyper-rustls-webpki-roots",
98 feature = "native-tls"
99 ))]
100 None => {
101 let transport =
102 launchdarkly_sdk_transport::HyperTransport::new_https().map_err(|e| {
103 BuildError::InvalidConfig(format!(
104 "failed to create default https transport: {e:?}"
105 ))
106 })?;
107 Ok(StreamingDataSource::new(
108 endpoints.streaming_base_url(),
109 sdk_key,
110 self.initial_reconnect_delay,
111 &tags,
112 transport,
113 ))
114 }
115 #[cfg(not(any(
116 feature = "hyper-rustls-native-roots",
117 feature = "hyper-rustls-webpki-roots",
118 feature = "native-tls"
119 )))]
120 None => Err(BuildError::InvalidConfig(
121 "https connector required when hyper-rustls-native-roots, hyper-rustls-webpki-roots, or native-tls features are disabled".into(),
122 )),
123 Some(transport) => Ok(StreamingDataSource::new(
124 endpoints.streaming_base_url(),
125 sdk_key,
126 self.initial_reconnect_delay,
127 &tags,
128 transport.clone(),
129 )),
130 };
131 let data_source = data_source_result?
132 .map_err(|e| BuildError::InvalidConfig(format!("invalid stream_base_url: {e:?}")))?;
133 Ok(Arc::new(data_source))
134 }
135
136 fn to_owned(&self) -> Box<dyn DataSourceFactory> {
137 Box::new(self.clone())
138 }
139}
140
141impl<T: launchdarkly_sdk_transport::HttpTransport> Default for StreamingDataSourceBuilder<T> {
142 fn default() -> Self {
143 StreamingDataSourceBuilder::new()
144 }
145}
146
147#[derive(Clone)]
148pub struct NullDataSourceBuilder {}
149
150impl NullDataSourceBuilder {
151 pub fn new() -> Self {
152 Self {}
153 }
154}
155
156impl DataSourceFactory for NullDataSourceBuilder {
157 fn build(
158 &self,
159 _: &service_endpoints::ServiceEndpoints,
160 _: &str,
161 _: Option<String>,
162 ) -> Result<Arc<dyn DataSource>, BuildError> {
163 Ok(Arc::new(NullDataSource::new()))
164 }
165
166 fn to_owned(&self) -> Box<dyn DataSourceFactory> {
167 Box::new(self.clone())
168 }
169}
170
171impl Default for NullDataSourceBuilder {
172 fn default() -> Self {
173 NullDataSourceBuilder::new()
174 }
175}
176
177#[derive(Clone)]
200pub struct PollingDataSourceBuilder<T: HttpTransport = launchdarkly_sdk_transport::HyperTransport> {
201 poll_interval: Duration,
202 transport: Option<T>,
203}
204
205impl<T: HttpTransport> PollingDataSourceBuilder<T> {
229 pub fn new() -> Self {
231 Self {
232 poll_interval: MINIMUM_POLL_INTERVAL,
233 transport: None,
234 }
235 }
236
237 pub fn poll_interval(&mut self, poll_interval: Duration) -> &mut Self {
242 self.poll_interval = std::cmp::max(poll_interval, MINIMUM_POLL_INTERVAL);
243 self
244 }
245
246 pub fn transport(&mut self, transport: T) -> &mut Self {
251 self.transport = Some(transport);
252 self
253 }
254}
255
256impl<T: HttpTransport> DataSourceFactory for PollingDataSourceBuilder<T> {
257 fn build(
258 &self,
259 endpoints: &service_endpoints::ServiceEndpoints,
260 sdk_key: &str,
261 tags: Option<String>,
262 ) -> Result<Arc<dyn DataSource>, BuildError> {
263 let feature_requester_builder: Result<Box<dyn FeatureRequesterFactory>, BuildError> =
264 match &self.transport {
265 #[cfg(any(
266 feature = "hyper-rustls-native-roots",
267 feature = "hyper-rustls-webpki-roots",
268 feature = "native-tls"
269 ))]
270 None => {
271 let transport = launchdarkly_sdk_transport::HyperTransport::new_https()
272 .map_err(|e| {
273 BuildError::InvalidConfig(format!(
274 "failed to create default https transport: {e:?}"
275 ))
276 })?;
277
278 Ok(Box::new(HttpFeatureRequesterBuilder::new(
279 endpoints.polling_base_url(),
280 sdk_key,
281 transport,
282 )))
283 }
284 #[cfg(not(any(
285 feature = "hyper-rustls-native-roots",
286 feature = "hyper-rustls-webpki-roots",
287 feature = "native-tls"
288 )))]
289 None => Err(BuildError::InvalidConfig(
290 "transport is required when hyper-rustls-native-roots, hyper-rustls-webpki-roots, or native-tls features are disabled".into(),
291 )),
292 Some(transport) => Ok(Box::new(HttpFeatureRequesterBuilder::new(
293 endpoints.polling_base_url(),
294 sdk_key,
295 transport.clone(),
296 ))),
297 };
298
299 let feature_requester_factory: Arc<Mutex<Box<dyn FeatureRequesterFactory>>> =
300 Arc::new(Mutex::new(feature_requester_builder?));
301
302 let data_source =
303 PollingDataSource::new(feature_requester_factory, self.poll_interval, tags);
304 Ok(Arc::new(data_source))
305 }
306
307 fn to_owned(&self) -> Box<dyn DataSourceFactory> {
308 Box::new(self.clone())
309 }
310}
311
312impl<T: HttpTransport> Default for PollingDataSourceBuilder<T> {
313 fn default() -> Self {
314 PollingDataSourceBuilder::new()
315 }
316}
317
318#[cfg(test)]
320#[derive(Clone)]
321pub(crate) struct MockDataSourceBuilder {
322 data_source: Option<Arc<data_source::MockDataSource>>,
323}
324
325#[cfg(test)]
326impl MockDataSourceBuilder {
327 pub fn new() -> MockDataSourceBuilder {
328 MockDataSourceBuilder { data_source: None }
329 }
330
331 pub fn data_source(
332 &mut self,
333 data_source: Arc<data_source::MockDataSource>,
334 ) -> &mut MockDataSourceBuilder {
335 self.data_source = Some(data_source);
336 self
337 }
338}
339
340#[cfg(test)]
341impl DataSourceFactory for MockDataSourceBuilder {
342 fn build(
343 &self,
344 _endpoints: &service_endpoints::ServiceEndpoints,
345 _sdk_key: &str,
346 _tags: Option<String>,
347 ) -> Result<Arc<dyn DataSource>, BuildError> {
348 Ok(self.data_source.as_ref().unwrap().clone())
349 }
350
351 fn to_owned(&self) -> Box<dyn DataSourceFactory> {
352 Box::new(self.clone())
353 }
354}
355
356#[cfg(test)]
357mod tests {
358 use bytes::Bytes;
359 use launchdarkly_sdk_transport::{HyperTransport, Request, ResponseFuture};
360
361 use super::*;
362
363 #[test]
364 fn default_stream_builder_has_correct_defaults() {
365 let builder: StreamingDataSourceBuilder<launchdarkly_sdk_transport::HyperTransport> =
366 StreamingDataSourceBuilder::new();
367
368 assert_eq!(
369 builder.initial_reconnect_delay,
370 DEFAULT_INITIAL_RECONNECT_DELAY
371 );
372 }
373
374 #[test]
375 fn stream_builder_can_use_custom_transport() {
376 #[derive(Debug, Clone)]
377 struct TestTransport;
378
379 impl launchdarkly_sdk_transport::HttpTransport for TestTransport {
380 fn request(&self, _request: Request<Option<Bytes>>) -> ResponseFuture {
381 unreachable!();
383 }
384 }
385
386 let mut builder = StreamingDataSourceBuilder::new();
387 builder.transport(TestTransport);
388 assert!(builder
389 .build(
390 &crate::ServiceEndpointsBuilder::new().build().unwrap(),
391 "test",
392 None
393 )
394 .is_ok());
395 }
396
397 #[test]
398 fn default_polling_builder_has_correct_defaults() {
399 let builder = PollingDataSourceBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
400 assert_eq!(builder.poll_interval, MINIMUM_POLL_INTERVAL,);
401 }
402
403 #[test]
404 fn initial_reconnect_delay_for_streaming_can_be_adjusted() {
405 let mut builder = StreamingDataSourceBuilder::<HyperTransport>::new();
406 builder.initial_reconnect_delay(Duration::from_secs(1234));
407 assert_eq!(builder.initial_reconnect_delay, Duration::from_secs(1234));
408 }
409}