Skip to main content

statsig_rust/specs_adapter/
statsig_customized_specs_adapter.rs

1use std::{sync::Arc, time::Duration};
2
3use super::statsig_data_store_specs_adapter::StatsigDataStoreSpecsAdapter;
4use super::StatsigHttpSpecsAdapter;
5use super::{SpecAdapterConfig, SpecsAdapterType};
6use crate::data_store_interface::DataStoreTrait;
7use crate::{log_i, log_w, SpecsAdapter, SpecsUpdateListener, StatsigOptions};
8use crate::{StatsigErr, StatsigRuntime};
9use async_trait::async_trait;
10
11#[cfg(feature = "with_grpc")]
12use super::statsig_grpc_specs_adapter::StatsigGrpcSpecsAdapter;
13
14#[cfg(not(feature = "with_grpc"))]
15use crate::log_e;
16
17const TAG: &str = stringify!(StatsigCustomizedSpecsAdapter);
18
19pub struct StatsigCustomizedSpecsAdapter {
20    adapters: Vec<Arc<dyn SpecsAdapter>>,
21}
22
23impl StatsigCustomizedSpecsAdapter {
24    pub fn new_from_config(
25        sdk_key: &str,
26        data_store_key: &str,
27        configs: Vec<SpecAdapterConfig>,
28        options: &StatsigOptions,
29    ) -> Self {
30        let mut adapters: Vec<Arc<dyn SpecsAdapter>> = Vec::new();
31        for config in &configs {
32            match config.adapter_type {
33                SpecsAdapterType::NetworkGrpcWebsocket => {
34                    if let Some(adapter) = Self::create_grpc_adapter(sdk_key, config, options) {
35                        adapters.push(adapter);
36                    }
37                }
38                SpecsAdapterType::NetworkHttp => {
39                    // Since strategies is an order list, we will just use i
40                    adapters.push(Arc::new(StatsigHttpSpecsAdapter::new(
41                        sdk_key,
42                        Some(options),
43                        config.specs_url.clone(),
44                    )));
45                }
46                SpecsAdapterType::DataStore => match options.data_store.clone() {
47                    Some(data_store) => {
48                        adapters.push(Arc::new(StatsigDataStoreSpecsAdapter::new(
49                            data_store_key,
50                            data_store,
51                            Some(options),
52                        )));
53                    }
54                    None => log_w!(TAG, "Datastore is not present for syncing spec"),
55                },
56            }
57        }
58
59        StatsigCustomizedSpecsAdapter { adapters }
60    }
61
62    pub fn new_from_data_store(
63        sdk_key: &str,
64        data_store_key: &str,
65        data_store: Arc<dyn DataStoreTrait>,
66        options: &StatsigOptions,
67    ) -> Self {
68        let data_store_spec_adapter =
69            StatsigDataStoreSpecsAdapter::new(data_store_key, data_store, Some(options));
70        let http_adapter = StatsigHttpSpecsAdapter::new(sdk_key, Some(options), None);
71        let adapters: Vec<Arc<dyn SpecsAdapter>> =
72            vec![Arc::new(data_store_spec_adapter), Arc::new(http_adapter)];
73        StatsigCustomizedSpecsAdapter { adapters }
74    }
75
76    #[cfg(feature = "with_grpc")]
77    fn create_grpc_adapter(
78        sdk_key: &str,
79        config: &SpecAdapterConfig,
80        options: &StatsigOptions,
81    ) -> Option<Arc<dyn SpecsAdapter>> {
82        Some(Arc::new(StatsigGrpcSpecsAdapter::new(
83            sdk_key,
84            config,
85            Some(options),
86        )))
87    }
88
89    #[cfg(not(feature = "with_grpc"))]
90    fn create_grpc_adapter(
91        _sdk_key: &str,
92        _config: &SpecAdapterConfig,
93        _options: &StatsigOptions,
94    ) -> Option<Arc<dyn SpecsAdapter>> {
95        log_e!(
96            TAG,
97            "Trying to use grpc websocket adapter but with grpc feature is not enabled"
98        );
99        None
100    }
101}
102
103#[async_trait]
104impl SpecsAdapter for StatsigCustomizedSpecsAdapter {
105    async fn start(
106        self: Arc<Self>,
107        statsig_runtime: &Arc<StatsigRuntime>,
108    ) -> Result<(), StatsigErr> {
109        for adapter in &self.adapters {
110            match adapter.to_owned().start(statsig_runtime).await {
111                Ok(()) => {
112                    return Ok(());
113                }
114                Err(e) => {
115                    log_w!(
116                        TAG,
117                        "Failed to initialize from {} adapter: {:?}",
118                        adapter.get_type_name(),
119                        e
120                    );
121                    // Carry on
122                }
123            }
124        }
125        return Err(StatsigErr::UnstartedAdapter(
126            "Failed to start any adapters".to_string(),
127        ));
128    }
129
130    fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
131        for adapter in &self.adapters {
132            adapter.initialize(listener.clone());
133        }
134    }
135
136    async fn schedule_background_sync(
137        self: Arc<Self>,
138        statsig_runtime: &Arc<StatsigRuntime>,
139    ) -> Result<(), StatsigErr> {
140        // TODO: we probably should have another option for config sync sources, but for now, we only have one
141        for adapter in &self.adapters {
142            match adapter
143                .to_owned()
144                .schedule_background_sync(statsig_runtime)
145                .await
146            {
147                Ok(()) => return Ok(()),
148                Err(_) => {
149                    // Carry on and try  next adapter
150                    log_i!(
151                        TAG,
152                        "Skipping schedule bg sync for {}",
153                        adapter.get_type_name()
154                    );
155                }
156            }
157        }
158        Ok(())
159    }
160
161    async fn shutdown(
162        &self,
163        timeout: Duration,
164        statsig_runtime: &Arc<StatsigRuntime>,
165    ) -> Result<(), StatsigErr> {
166        let timeout_for_each = timeout
167            .checked_div(self.adapters.len() as u32)
168            .unwrap_or(timeout);
169        for adapter in &self.adapters {
170            let _ = adapter.shutdown(timeout_for_each, statsig_runtime).await;
171        }
172        Ok(())
173    }
174
175    fn get_type_name(&self) -> String {
176        stringify!(StatsigCustomizedSpecsAdapter).to_string()
177    }
178}