1use super::service_endpoints;
2use crate::data_source::{DataSource, NullDataSource, PollingDataSource, StreamingDataSource};
3use crate::feature_requester_builders::{FeatureRequesterFactory, HttpFeatureRequesterBuilder};
4use launchdarkly_sdk_transport::HttpTransport;
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7use thiserror::Error;
8
9#[cfg(test)]
10use super::data_source;
11
12#[non_exhaustive]
14#[derive(Debug, Error)]
15pub enum BuildError {
16 #[error("data source factory failed to build: {0}")]
18 InvalidConfig(String),
19}
20
21const DEFAULT_INITIAL_RECONNECT_DELAY: Duration = Duration::from_secs(1);
22const MINIMUM_POLL_INTERVAL: Duration = Duration::from_secs(30);
23
24pub trait DataSourceFactory {
26 fn build(
27 &self,
28 endpoints: &service_endpoints::ServiceEndpoints,
29 sdk_key: &str,
30 tags: Option<String>,
31 ) -> Result<Arc<dyn DataSource>, BuildError>;
32
33 fn set_instance_id(&mut self, _instance_id: String) {}
39
40 fn to_owned(&self) -> Box<dyn DataSourceFactory>;
41}
42
43#[derive(Clone)]
63pub struct StreamingDataSourceBuilder<T: launchdarkly_sdk_transport::HttpTransport> {
64 initial_reconnect_delay: Duration,
65 transport: Option<T>,
66 instance_id: Option<String>,
67}
68
69impl<T: launchdarkly_sdk_transport::HttpTransport> StreamingDataSourceBuilder<T> {
70 pub fn new() -> Self {
72 Self {
73 initial_reconnect_delay: DEFAULT_INITIAL_RECONNECT_DELAY,
74 transport: None,
75 instance_id: None,
76 }
77 }
78
79 pub fn initial_reconnect_delay(&mut self, duration: Duration) -> &mut Self {
81 self.initial_reconnect_delay = duration;
82 self
83 }
84
85 pub fn transport(&mut self, transport: T) -> &mut Self {
90 self.transport = Some(transport);
91 self
92 }
93}
94
95impl<T: launchdarkly_sdk_transport::HttpTransport> DataSourceFactory
96 for StreamingDataSourceBuilder<T>
97{
98 fn build(
99 &self,
100 endpoints: &service_endpoints::ServiceEndpoints,
101 sdk_key: &str,
102 tags: Option<String>,
103 ) -> Result<Arc<dyn DataSource>, BuildError> {
104 let instance_id = self.instance_id.as_deref();
105 let data_source_result = match &self.transport {
106 #[cfg(any(
107 feature = "hyper-rustls-native-roots",
108 feature = "hyper-rustls-webpki-roots",
109 feature = "native-tls"
110 ))]
111 None => {
112 let transport =
113 launchdarkly_sdk_transport::HyperTransport::new_https().map_err(|e| {
114 BuildError::InvalidConfig(format!(
115 "failed to create default https transport: {e:?}"
116 ))
117 })?;
118 Ok(StreamingDataSource::new(
119 endpoints.streaming_base_url(),
120 sdk_key,
121 self.initial_reconnect_delay,
122 &tags,
123 instance_id,
124 transport,
125 ))
126 }
127 #[cfg(not(any(
128 feature = "hyper-rustls-native-roots",
129 feature = "hyper-rustls-webpki-roots",
130 feature = "native-tls"
131 )))]
132 None => Err(BuildError::InvalidConfig(
133 "https connector required when hyper-rustls-native-roots, hyper-rustls-webpki-roots, or native-tls features are disabled".into(),
134 )),
135 Some(transport) => Ok(StreamingDataSource::new(
136 endpoints.streaming_base_url(),
137 sdk_key,
138 self.initial_reconnect_delay,
139 &tags,
140 instance_id,
141 transport.clone(),
142 )),
143 };
144 let data_source = data_source_result?
145 .map_err(|e| BuildError::InvalidConfig(format!("invalid stream_base_url: {e:?}")))?;
146 Ok(Arc::new(data_source))
147 }
148
149 fn set_instance_id(&mut self, instance_id: String) {
150 self.instance_id = Some(instance_id);
151 }
152
153 fn to_owned(&self) -> Box<dyn DataSourceFactory> {
154 Box::new(self.clone())
155 }
156}
157
158impl<T: launchdarkly_sdk_transport::HttpTransport> Default for StreamingDataSourceBuilder<T> {
159 fn default() -> Self {
160 StreamingDataSourceBuilder::new()
161 }
162}
163
164#[derive(Clone)]
165pub struct NullDataSourceBuilder {}
166
167impl NullDataSourceBuilder {
168 pub fn new() -> Self {
169 Self {}
170 }
171}
172
173impl DataSourceFactory for NullDataSourceBuilder {
174 fn build(
175 &self,
176 _: &service_endpoints::ServiceEndpoints,
177 _: &str,
178 _: Option<String>,
179 ) -> Result<Arc<dyn DataSource>, BuildError> {
180 Ok(Arc::new(NullDataSource::new()))
181 }
182
183 fn to_owned(&self) -> Box<dyn DataSourceFactory> {
184 Box::new(self.clone())
185 }
186}
187
188impl Default for NullDataSourceBuilder {
189 fn default() -> Self {
190 NullDataSourceBuilder::new()
191 }
192}
193
194#[derive(Clone)]
217pub struct PollingDataSourceBuilder<T: HttpTransport = launchdarkly_sdk_transport::HyperTransport> {
218 poll_interval: Duration,
219 transport: Option<T>,
220 instance_id: Option<String>,
221}
222
223impl<T: HttpTransport> PollingDataSourceBuilder<T> {
247 pub fn new() -> Self {
249 Self {
250 poll_interval: MINIMUM_POLL_INTERVAL,
251 transport: None,
252 instance_id: None,
253 }
254 }
255
256 pub fn poll_interval(&mut self, poll_interval: Duration) -> &mut Self {
261 self.poll_interval = std::cmp::max(poll_interval, MINIMUM_POLL_INTERVAL);
262 self
263 }
264
265 pub fn transport(&mut self, transport: T) -> &mut Self {
270 self.transport = Some(transport);
271 self
272 }
273}
274
275impl<T: HttpTransport> DataSourceFactory for PollingDataSourceBuilder<T> {
276 fn build(
277 &self,
278 endpoints: &service_endpoints::ServiceEndpoints,
279 sdk_key: &str,
280 tags: Option<String>,
281 ) -> Result<Arc<dyn DataSource>, BuildError> {
282 let instance_id = self.instance_id.as_deref();
283 let feature_requester_builder: Result<Box<dyn FeatureRequesterFactory>, BuildError> =
284 match &self.transport {
285 #[cfg(any(
286 feature = "hyper-rustls-native-roots",
287 feature = "hyper-rustls-webpki-roots",
288 feature = "native-tls"
289 ))]
290 None => {
291 let transport = launchdarkly_sdk_transport::HyperTransport::new_https()
292 .map_err(|e| {
293 BuildError::InvalidConfig(format!(
294 "failed to create default https transport: {e:?}"
295 ))
296 })?;
297 let mut builder = HttpFeatureRequesterBuilder::new(
298 endpoints.polling_base_url(),
299 sdk_key,
300 transport,
301 );
302 if let Some(instance_id) = instance_id {
303 builder = builder.with_instance_id(instance_id);
304 }
305 Ok(Box::new(builder))
306 }
307 #[cfg(not(any(
308 feature = "hyper-rustls-native-roots",
309 feature = "hyper-rustls-webpki-roots",
310 feature = "native-tls"
311 )))]
312 None => Err(BuildError::InvalidConfig(
313 "transport is required when hyper-rustls-native-roots, hyper-rustls-webpki-roots, or native-tls features are disabled".into(),
314 )),
315 Some(transport) => {
316 let mut builder = HttpFeatureRequesterBuilder::new(
317 endpoints.polling_base_url(),
318 sdk_key,
319 transport.clone(),
320 );
321 if let Some(instance_id) = instance_id {
322 builder = builder.with_instance_id(instance_id);
323 }
324 Ok(Box::new(builder))
325 }
326 };
327
328 let feature_requester_factory: Arc<Mutex<Box<dyn FeatureRequesterFactory>>> =
329 Arc::new(Mutex::new(feature_requester_builder?));
330
331 let data_source =
332 PollingDataSource::new(feature_requester_factory, self.poll_interval, tags);
333 Ok(Arc::new(data_source))
334 }
335
336 fn set_instance_id(&mut self, instance_id: String) {
337 self.instance_id = Some(instance_id);
338 }
339
340 fn to_owned(&self) -> Box<dyn DataSourceFactory> {
341 Box::new(self.clone())
342 }
343}
344
345impl<T: HttpTransport> Default for PollingDataSourceBuilder<T> {
346 fn default() -> Self {
347 PollingDataSourceBuilder::new()
348 }
349}
350
351#[cfg(test)]
353#[derive(Clone)]
354pub(crate) struct MockDataSourceBuilder {
355 data_source: Option<Arc<data_source::MockDataSource>>,
356}
357
358#[cfg(test)]
359impl MockDataSourceBuilder {
360 pub fn new() -> MockDataSourceBuilder {
361 MockDataSourceBuilder { data_source: None }
362 }
363
364 pub fn data_source(
365 &mut self,
366 data_source: Arc<data_source::MockDataSource>,
367 ) -> &mut MockDataSourceBuilder {
368 self.data_source = Some(data_source);
369 self
370 }
371}
372
373#[cfg(test)]
374impl DataSourceFactory for MockDataSourceBuilder {
375 fn build(
376 &self,
377 _endpoints: &service_endpoints::ServiceEndpoints,
378 _sdk_key: &str,
379 _tags: Option<String>,
380 ) -> Result<Arc<dyn DataSource>, BuildError> {
381 Ok(self.data_source.as_ref().unwrap().clone())
382 }
383
384 fn to_owned(&self) -> Box<dyn DataSourceFactory> {
385 Box::new(self.clone())
386 }
387}
388
389#[cfg(test)]
390mod tests {
391 use bytes::Bytes;
392 use launchdarkly_sdk_transport::{HyperTransport, Request, ResponseFuture};
393
394 use super::*;
395
396 #[test]
397 fn default_stream_builder_has_correct_defaults() {
398 let builder: StreamingDataSourceBuilder<launchdarkly_sdk_transport::HyperTransport> =
399 StreamingDataSourceBuilder::new();
400
401 assert_eq!(
402 builder.initial_reconnect_delay,
403 DEFAULT_INITIAL_RECONNECT_DELAY
404 );
405 }
406
407 #[test]
408 fn stream_builder_can_use_custom_transport() {
409 #[derive(Debug, Clone)]
410 struct TestTransport;
411
412 impl launchdarkly_sdk_transport::HttpTransport for TestTransport {
413 fn request(&self, _request: Request<Option<Bytes>>) -> ResponseFuture {
414 unreachable!();
416 }
417 }
418
419 let mut builder = StreamingDataSourceBuilder::new();
420 builder.transport(TestTransport);
421 assert!(builder
422 .build(
423 &crate::ServiceEndpointsBuilder::new().build().unwrap(),
424 "test",
425 None,
426 )
427 .is_ok());
428 }
429
430 #[test]
431 fn default_polling_builder_has_correct_defaults() {
432 let builder = PollingDataSourceBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
433 assert_eq!(builder.poll_interval, MINIMUM_POLL_INTERVAL,);
434 }
435
436 #[test]
437 fn initial_reconnect_delay_for_streaming_can_be_adjusted() {
438 let mut builder = StreamingDataSourceBuilder::<HyperTransport>::new();
439 builder.initial_reconnect_delay(Duration::from_secs(1234));
440 assert_eq!(builder.initial_reconnect_delay, Duration::from_secs(1234));
441 }
442}