statsig_rust/specs_adapter/
statsig_customized_specs_adapter.rs

1use std::{sync::Arc, time::Duration};
2
3use super::statsig_data_store_specs_adapter::StatsigDataStoreSpecsAdapter;
4use super::StatsigHttpSpecsAdapter;
5use super::{SpecAdapterConfig, SpecsAdapterType};
6use crate::data_store_interface::DataStoreTrait;
7use crate::hashing::HashUtil;
8use crate::{log_i, log_w, SpecsAdapter, SpecsUpdateListener, StatsigOptions};
9use crate::{StatsigErr, StatsigRuntime};
10use async_trait::async_trait;
11
12#[cfg(feature = "with_grpc")]
13use super::statsig_grpc_specs_adapter::StatsigGrpcSpecsAdapter;
14
15#[cfg(not(feature = "with_grpc"))]
16use crate::log_e;
17
18const TAG: &str = stringify!(StatsigCustomizedSpecsAdapter);
19
20pub struct StatsigCustomizedSpecsAdapter {
21    adapters: Vec<Arc<dyn SpecsAdapter>>,
22}
23
24impl StatsigCustomizedSpecsAdapter {
25    pub fn new_from_config(
26        sdk_key: &str,
27        configs: Vec<SpecAdapterConfig>,
28        options: &StatsigOptions,
29        hashing: &HashUtil,
30    ) -> Self {
31        let mut adapters: Vec<Arc<dyn SpecsAdapter>> = Vec::new();
32        for config in &configs {
33            match config.adapter_type {
34                SpecsAdapterType::NetworkGrpcWebsocket => {
35                    if let Some(adapter) = Self::create_grpc_adapter(sdk_key, config, options) {
36                        adapters.push(adapter);
37                    }
38                }
39                SpecsAdapterType::NetworkHttp => {
40                    // Since strategies is an order list, we will just use i
41                    adapters.push(Arc::new(StatsigHttpSpecsAdapter::new(
42                        sdk_key,
43                        Some(options),
44                        config.specs_url.clone(),
45                    )));
46                }
47                SpecsAdapterType::DataStore => match options.data_store.clone() {
48                    Some(data_store) => {
49                        adapters.push(Arc::new(StatsigDataStoreSpecsAdapter::new(
50                            sdk_key,
51                            data_store,
52                            hashing,
53                            Some(options),
54                        )));
55                    }
56                    None => log_w!(TAG, "Datastore is not present for syncing spec"),
57                },
58            }
59        }
60
61        StatsigCustomizedSpecsAdapter { adapters }
62    }
63
64    pub fn new_from_data_store(
65        sdk_key: &str,
66        data_store: Arc<dyn DataStoreTrait>,
67        options: &StatsigOptions,
68        hashing: &HashUtil,
69    ) -> Self {
70        let data_adapter_spec_adapter =
71            StatsigDataStoreSpecsAdapter::new(sdk_key, data_store, hashing, Some(options));
72        let http_adapter = StatsigHttpSpecsAdapter::new(sdk_key, Some(options), None);
73        let adapters: Vec<Arc<dyn SpecsAdapter>> =
74            vec![Arc::new(data_adapter_spec_adapter), Arc::new(http_adapter)];
75        StatsigCustomizedSpecsAdapter { adapters }
76    }
77
78    #[cfg(feature = "with_grpc")]
79    fn create_grpc_adapter(
80        sdk_key: &str,
81        config: &SpecAdapterConfig,
82        options: &StatsigOptions,
83    ) -> Option<Arc<dyn SpecsAdapter>> {
84        Some(Arc::new(StatsigGrpcSpecsAdapter::new(
85            sdk_key,
86            config,
87            Some(options),
88        )))
89    }
90
91    #[cfg(not(feature = "with_grpc"))]
92    fn create_grpc_adapter(
93        _sdk_key: &str,
94        _config: &SpecAdapterConfig,
95        _options: &StatsigOptions,
96    ) -> Option<Arc<dyn SpecsAdapter>> {
97        log_e!(
98            TAG,
99            "Trying to use grpc websocket adapter but with grpc feature is not enabled"
100        );
101        None
102    }
103}
104
105#[async_trait]
106impl SpecsAdapter for StatsigCustomizedSpecsAdapter {
107    async fn start(
108        self: Arc<Self>,
109        statsig_runtime: &Arc<StatsigRuntime>,
110    ) -> Result<(), StatsigErr> {
111        for adapter in &self.adapters {
112            match adapter.to_owned().start(statsig_runtime).await {
113                Ok(()) => {
114                    return Ok(());
115                }
116                Err(e) => {
117                    log_w!(
118                        TAG,
119                        "Failed to initialize from {} adapter: {:?}",
120                        adapter.get_type_name(),
121                        e
122                    );
123                    // Carry on
124                }
125            }
126        }
127        return Err(StatsigErr::UnstartedAdapter(
128            "Failed to start any adapters".to_string(),
129        ));
130    }
131
132    fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
133        for adapter in &self.adapters {
134            adapter.initialize(listener.clone());
135        }
136    }
137
138    async fn schedule_background_sync(
139        self: Arc<Self>,
140        statsig_runtime: &Arc<StatsigRuntime>,
141    ) -> Result<(), StatsigErr> {
142        // TODO: we probably should have another option for config sync sources, but for now, we only have one
143        for adapter in &self.adapters {
144            match adapter
145                .to_owned()
146                .schedule_background_sync(statsig_runtime)
147                .await
148            {
149                Ok(()) => return Ok(()),
150                Err(_) => {
151                    // Carry on and try  next adapter
152                    log_i!(
153                        TAG,
154                        "Skipping schedule bg sync for {}",
155                        adapter.get_type_name()
156                    );
157                }
158            }
159        }
160        Ok(())
161    }
162
163    async fn shutdown(
164        &self,
165        timeout: Duration,
166        statsig_runtime: &Arc<StatsigRuntime>,
167    ) -> Result<(), StatsigErr> {
168        let timeout_for_each = timeout
169            .checked_div(self.adapters.len() as u32)
170            .unwrap_or(timeout);
171        for adapter in &self.adapters {
172            let _ = adapter.shutdown(timeout_for_each, statsig_runtime).await;
173        }
174        Ok(())
175    }
176
177    fn get_type_name(&self) -> String {
178        stringify!(StatsigCustomizedSpecsAdapter).to_string()
179    }
180}