statsig_rust/specs_adapter/
statsig_customized_specs_adapter.rs

1use std::{sync::Arc, time::Duration};
2
3use super::statsig_data_store_specs_adapter::StatsigDataStoreSpecsAdapter;
4use super::StatsigHttpSpecsAdapter;
5use super::{SpecAdapterConfig, SpecsAdapterType};
6use crate::data_store_interface::DataStoreTrait;
7use crate::hashing::HashUtil;
8use crate::{log_i, log_w, SpecsAdapter, SpecsUpdateListener, StatsigOptions};
9use crate::{StatsigErr, StatsigRuntime};
10use async_trait::async_trait;
11
12#[cfg(feature = "with_grpc")]
13use super::statsig_grpc_specs_adapter::StatsigGrpcSpecsAdapter;
14
15#[cfg(not(feature = "with_grpc"))]
16use crate::log_e;
17
18const TAG: &str = stringify!(StatsigCustomizedSpecsAdapter);
19
20pub struct StatsigCustomizedSpecsAdapter {
21    adapters: Vec<Arc<dyn SpecsAdapter>>,
22}
23
24impl StatsigCustomizedSpecsAdapter {
25    pub fn new_from_config(
26        sdk_key: &str,
27        configs: Vec<SpecAdapterConfig>,
28        options: &StatsigOptions,
29        hashing: &HashUtil,
30    ) -> Self {
31        let mut adapters: Vec<Arc<dyn SpecsAdapter>> = Vec::new();
32        for config in &configs {
33            match config.adapter_type {
34                SpecsAdapterType::NetworkGrpcWebsocket => {
35                    if let Some(adapter) = Self::create_grpc_adapter(sdk_key, config, options) {
36                        adapters.push(adapter);
37                    }
38                }
39                SpecsAdapterType::NetworkHttp => {
40                    // Since strategies is an order list, we will just use i
41                    adapters.push(Arc::new(StatsigHttpSpecsAdapter::new(
42                        sdk_key,
43                        config.specs_url.as_ref(),
44                        options.fallback_to_statsig_api.unwrap_or(false),
45                        options.specs_sync_interval_ms,
46                        options.disable_network,
47                    )));
48                }
49                SpecsAdapterType::DataStore => match options.data_store.clone() {
50                    Some(data_store) => {
51                        adapters.push(Arc::new(StatsigDataStoreSpecsAdapter::new(
52                            sdk_key,
53                            data_store,
54                            hashing,
55                            options.specs_sync_interval_ms,
56                        )));
57                    }
58                    None => log_w!(TAG, "Datastore is not present for syncing spec"),
59                },
60            }
61        }
62
63        StatsigCustomizedSpecsAdapter { adapters }
64    }
65
66    pub fn new_from_data_store(
67        sdk_key: &str,
68        data_store: Arc<dyn DataStoreTrait>,
69        options: &StatsigOptions,
70        hashing: &HashUtil,
71    ) -> Self {
72        let data_adapter_spec_adapter = StatsigDataStoreSpecsAdapter::new(
73            sdk_key,
74            data_store,
75            hashing,
76            options.specs_sync_interval_ms,
77        );
78        let http_adapter = StatsigHttpSpecsAdapter::new(
79            sdk_key,
80            options.specs_url.as_ref(),
81            options.fallback_to_statsig_api.unwrap_or(false),
82            options.specs_sync_interval_ms,
83            options.disable_network,
84        );
85        let adapters: Vec<Arc<dyn SpecsAdapter>> =
86            vec![Arc::new(data_adapter_spec_adapter), Arc::new(http_adapter)];
87        StatsigCustomizedSpecsAdapter { adapters }
88    }
89
90    #[cfg(feature = "with_grpc")]
91    fn create_grpc_adapter(
92        sdk_key: &str,
93        config: &SpecAdapterConfig,
94        options: &StatsigOptions,
95    ) -> Option<Arc<dyn SpecsAdapter>> {
96        Some(Arc::new(StatsigGrpcSpecsAdapter::new(
97            sdk_key,
98            config,
99            options.disable_network,
100        )))
101    }
102
103    #[cfg(not(feature = "with_grpc"))]
104    fn create_grpc_adapter(
105        _sdk_key: &str,
106        _config: &SpecAdapterConfig,
107        _options: &StatsigOptions,
108    ) -> Option<Arc<dyn SpecsAdapter>> {
109        log_e!(
110            TAG,
111            "Trying to use grpc websocket adapter but with grpc feature is not enabled"
112        );
113        None
114    }
115}
116
117#[async_trait]
118impl SpecsAdapter for StatsigCustomizedSpecsAdapter {
119    async fn start(
120        self: Arc<Self>,
121        statsig_runtime: &Arc<StatsigRuntime>,
122    ) -> Result<(), StatsigErr> {
123        for adapter in &self.adapters {
124            match adapter.to_owned().start(statsig_runtime).await {
125                Ok(()) => {
126                    return Ok(());
127                }
128                Err(e) => {
129                    log_w!(
130                        TAG,
131                        "Failed to initialize from {} adapter: {:?}",
132                        adapter.get_type_name(),
133                        e
134                    );
135                    // Carry on
136                }
137            }
138        }
139        return Err(StatsigErr::UnstartedAdapter(
140            "Failed to start any adapters".to_string(),
141        ));
142    }
143
144    fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
145        for adapter in &self.adapters {
146            adapter.initialize(listener.clone());
147        }
148    }
149
150    async fn schedule_background_sync(
151        self: Arc<Self>,
152        statsig_runtime: &Arc<StatsigRuntime>,
153    ) -> Result<(), StatsigErr> {
154        // TODO: we probably should have another option for config sync sources, but for now, we only have one
155        for adapter in &self.adapters {
156            match adapter
157                .to_owned()
158                .schedule_background_sync(statsig_runtime)
159                .await
160            {
161                Ok(()) => return Ok(()),
162                Err(_) => {
163                    // Carry on and try  next adapter
164                    log_i!(
165                        TAG,
166                        "Skipping schedule bg sync for {}",
167                        adapter.get_type_name()
168                    );
169                }
170            }
171        }
172        Ok(())
173    }
174
175    async fn shutdown(
176        &self,
177        timeout: Duration,
178        statsig_runtime: &Arc<StatsigRuntime>,
179    ) -> Result<(), StatsigErr> {
180        let timeout_for_each = timeout
181            .checked_div(self.adapters.len() as u32)
182            .unwrap_or(timeout);
183        for adapter in &self.adapters {
184            let _ = adapter.shutdown(timeout_for_each, statsig_runtime).await;
185        }
186        Ok(())
187    }
188
189    fn get_type_name(&self) -> String {
190        stringify!(StatsigCustomizedSpecsAdapter).to_string()
191    }
192}