statsig_rust/specs_adapter/
statsig_customized_specs_adapter.rs

1use std::{sync::Arc, time::Duration};
2
3use super::statsig_data_store_specs_adapter::StatsigDataStoreSpecsAdapter;
4use super::StatsigHttpSpecsAdapter;
5use super::{SpecAdapterConfig, SpecsAdapterType};
6use crate::data_store_interface::DataStoreTrait;
7use crate::hashing::HashUtil;
8use crate::{log_i, log_w, SpecsAdapter, SpecsUpdateListener, StatsigOptions};
9use crate::{StatsigErr, StatsigRuntime};
10use async_trait::async_trait;
11
12#[cfg(feature = "with_grpc")]
13use super::statsig_grpc_specs_adapter::StatsigGrpcSpecsAdapter;
14
15#[cfg(not(feature = "with_grpc"))]
16use crate::log_e;
17
18const TAG: &str = stringify!(StatsigCustomizedSpecsAdapter);
19
20pub struct StatsigCustomizedSpecsAdapter {
21    adapters: Vec<Arc<dyn SpecsAdapter>>,
22}
23
24impl StatsigCustomizedSpecsAdapter {
25    pub fn new_from_config(
26        sdk_key: &str,
27        configs: Vec<SpecAdapterConfig>,
28        options: &StatsigOptions,
29        hashing: &HashUtil,
30    ) -> Self {
31        let mut adapters: Vec<Arc<dyn SpecsAdapter>> = Vec::new();
32        for config in &configs {
33            match config.adapter_type {
34                SpecsAdapterType::NetworkGrpcWebsocket => {
35                    if let Some(adapter) = Self::create_grpc_adapter(sdk_key, config) {
36                        adapters.push(adapter);
37                    }
38                }
39                SpecsAdapterType::NetworkHttp => {
40                    // Since strategies is an order list, we will just use i
41                    adapters.push(Arc::new(StatsigHttpSpecsAdapter::new(
42                        sdk_key,
43                        config.specs_url.as_ref(),
44                        options.fallback_to_statsig_api.unwrap_or(false),
45                        options.specs_sync_interval_ms,
46                    )));
47                }
48                SpecsAdapterType::DataStore => match options.data_store.clone() {
49                    Some(data_store) => {
50                        adapters.push(Arc::new(StatsigDataStoreSpecsAdapter::new(
51                            sdk_key,
52                            data_store,
53                            hashing,
54                            options.specs_sync_interval_ms,
55                        )));
56                    }
57                    None => log_w!(TAG, "Datastore is not present for syncing spec"),
58                },
59            }
60        }
61
62        StatsigCustomizedSpecsAdapter { adapters }
63    }
64
65    pub fn new_from_data_store(
66        sdk_key: &str,
67        data_store: Arc<dyn DataStoreTrait>,
68        options: &StatsigOptions,
69        hashing: &HashUtil,
70    ) -> Self {
71        let data_adapter_spec_adapter = StatsigDataStoreSpecsAdapter::new(
72            sdk_key,
73            data_store,
74            hashing,
75            options.specs_sync_interval_ms,
76        );
77        let http_adapter = StatsigHttpSpecsAdapter::new(
78            sdk_key,
79            options.specs_url.as_ref(),
80            options.fallback_to_statsig_api.unwrap_or(false),
81            options.specs_sync_interval_ms,
82        );
83        let adapters: Vec<Arc<dyn SpecsAdapter>> =
84            vec![Arc::new(data_adapter_spec_adapter), Arc::new(http_adapter)];
85        StatsigCustomizedSpecsAdapter { adapters }
86    }
87
88    #[cfg(feature = "with_grpc")]
89    fn create_grpc_adapter(
90        sdk_key: &str,
91        config: &SpecAdapterConfig,
92    ) -> Option<Arc<dyn SpecsAdapter>> {
93        Some(Arc::new(StatsigGrpcSpecsAdapter::new(sdk_key, config)))
94    }
95
96    #[cfg(not(feature = "with_grpc"))]
97    fn create_grpc_adapter(
98        _sdk_key: &str,
99        _config: &SpecAdapterConfig,
100    ) -> Option<Arc<dyn SpecsAdapter>> {
101        log_e!(
102            TAG,
103            "Trying to use grpc websocket adapter but with grpc feature is not enabled"
104        );
105        None
106    }
107}
108
109#[async_trait]
110impl SpecsAdapter for StatsigCustomizedSpecsAdapter {
111    async fn start(
112        self: Arc<Self>,
113        statsig_runtime: &Arc<StatsigRuntime>,
114    ) -> Result<(), StatsigErr> {
115        for adapter in &self.adapters {
116            match adapter.to_owned().start(statsig_runtime).await {
117                Ok(()) => {
118                    return Ok(());
119                }
120                _ => {
121                    log_w!(
122                        TAG,
123                        "Failed to initialize from {} adapter",
124                        adapter.get_type_name()
125                    );
126                    // Carry on
127                }
128            }
129        }
130        return Err(StatsigErr::NetworkError(
131            "Failed to start any adapters".to_string(),
132        ));
133    }
134
135    fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
136        for adapter in &self.adapters {
137            adapter.initialize(listener.clone());
138        }
139    }
140
141    async fn schedule_background_sync(
142        self: Arc<Self>,
143        statsig_runtime: &Arc<StatsigRuntime>,
144    ) -> Result<(), StatsigErr> {
145        // TODO: we probably should have another option for config sync sources, but for now, we only have one
146        for adapter in &self.adapters {
147            match adapter
148                .to_owned()
149                .schedule_background_sync(statsig_runtime)
150                .await
151            {
152                Ok(()) => return Ok(()),
153                Err(_) => {
154                    // Carry on and try  next adapter
155                    log_i!(
156                        TAG,
157                        "Skipping schedule bg sync for {}",
158                        adapter.get_type_name()
159                    );
160                }
161            }
162        }
163        Ok(())
164    }
165
166    async fn shutdown(
167        &self,
168        timeout: Duration,
169        statsig_runtime: &Arc<StatsigRuntime>,
170    ) -> Result<(), StatsigErr> {
171        let timeout_for_each = timeout
172            .checked_div(self.adapters.len() as u32)
173            .unwrap_or(timeout);
174        for adapter in &self.adapters {
175            let _ = adapter.shutdown(timeout_for_each, statsig_runtime).await;
176        }
177        Ok(())
178    }
179
180    fn get_type_name(&self) -> String {
181        stringify!(StatsigCustomizedSpecsAdapter).to_string()
182    }
183}