statsig_rust/specs_adapter/
statsig_customized_specs_adapter.rs

1use std::{sync::Arc, time::Duration};
2
3use super::statsig_data_store_specs_adapter::StatsigDataStoreSpecsAdapter;
4use super::StatsigHttpSpecsAdapter;
5use super::{SpecAdapterConfig, SpecsAdapterType};
6use crate::data_store_interface::DataStoreTrait;
7use crate::hashing::HashUtil;
8use crate::{log_i, log_w, SpecsAdapter, SpecsUpdateListener, StatsigOptions};
9use crate::{StatsigErr, StatsigRuntime};
10use async_trait::async_trait;
11
12#[cfg(feature = "with_grpc")]
13use super::statsig_grpc_specs_adapter::StatsigGrpcSpecsAdapter;
14
15#[cfg(not(feature = "with_grpc"))]
16use crate::log_e;
17
18const TAG: &str = stringify!(StatsigCustomizedSpecsAdapter);
19
20pub struct StatsigCustomizedSpecsAdapter {
21    adapters: Vec<Arc<dyn SpecsAdapter>>,
22}
23
24impl StatsigCustomizedSpecsAdapter {
25    pub fn new_from_config(
26        sdk_key: &str,
27        configs: Vec<SpecAdapterConfig>,
28        options: &StatsigOptions,
29        hashing: &HashUtil,
30    ) -> Self {
31        let mut adapters: Vec<Arc<dyn SpecsAdapter>> = Vec::new();
32        for config in &configs {
33            match config.adapter_type {
34                SpecsAdapterType::NetworkGrpcWebsocket => {
35                    if let Some(adapter) = Self::create_grpc_adapter(sdk_key, config, options) {
36                        adapters.push(adapter);
37                    }
38                }
39                SpecsAdapterType::NetworkHttp => {
40                    // Since strategies is an order list, we will just use i
41                    adapters.push(Arc::new(StatsigHttpSpecsAdapter::new(
42                        sdk_key,
43                        Some(options),
44                    )));
45                }
46                SpecsAdapterType::DataStore => match options.data_store.clone() {
47                    Some(data_store) => {
48                        adapters.push(Arc::new(StatsigDataStoreSpecsAdapter::new(
49                            sdk_key,
50                            data_store,
51                            hashing,
52                            Some(options),
53                        )));
54                    }
55                    None => log_w!(TAG, "Datastore is not present for syncing spec"),
56                },
57            }
58        }
59
60        StatsigCustomizedSpecsAdapter { adapters }
61    }
62
63    pub fn new_from_data_store(
64        sdk_key: &str,
65        data_store: Arc<dyn DataStoreTrait>,
66        options: &StatsigOptions,
67        hashing: &HashUtil,
68    ) -> Self {
69        let data_adapter_spec_adapter =
70            StatsigDataStoreSpecsAdapter::new(sdk_key, data_store, hashing, Some(options));
71        let http_adapter = StatsigHttpSpecsAdapter::new(sdk_key, Some(options));
72        let adapters: Vec<Arc<dyn SpecsAdapter>> =
73            vec![Arc::new(data_adapter_spec_adapter), Arc::new(http_adapter)];
74        StatsigCustomizedSpecsAdapter { adapters }
75    }
76
77    #[cfg(feature = "with_grpc")]
78    fn create_grpc_adapter(
79        sdk_key: &str,
80        config: &SpecAdapterConfig,
81        options: &StatsigOptions,
82    ) -> Option<Arc<dyn SpecsAdapter>> {
83        Some(Arc::new(StatsigGrpcSpecsAdapter::new(
84            sdk_key,
85            config,
86            Some(options),
87        )))
88    }
89
90    #[cfg(not(feature = "with_grpc"))]
91    fn create_grpc_adapter(
92        _sdk_key: &str,
93        _config: &SpecAdapterConfig,
94        _options: &StatsigOptions,
95    ) -> Option<Arc<dyn SpecsAdapter>> {
96        log_e!(
97            TAG,
98            "Trying to use grpc websocket adapter but with grpc feature is not enabled"
99        );
100        None
101    }
102}
103
104#[async_trait]
105impl SpecsAdapter for StatsigCustomizedSpecsAdapter {
106    async fn start(
107        self: Arc<Self>,
108        statsig_runtime: &Arc<StatsigRuntime>,
109    ) -> Result<(), StatsigErr> {
110        for adapter in &self.adapters {
111            match adapter.to_owned().start(statsig_runtime).await {
112                Ok(()) => {
113                    return Ok(());
114                }
115                Err(e) => {
116                    log_w!(
117                        TAG,
118                        "Failed to initialize from {} adapter: {:?}",
119                        adapter.get_type_name(),
120                        e
121                    );
122                    // Carry on
123                }
124            }
125        }
126        return Err(StatsigErr::UnstartedAdapter(
127            "Failed to start any adapters".to_string(),
128        ));
129    }
130
131    fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
132        for adapter in &self.adapters {
133            adapter.initialize(listener.clone());
134        }
135    }
136
137    async fn schedule_background_sync(
138        self: Arc<Self>,
139        statsig_runtime: &Arc<StatsigRuntime>,
140    ) -> Result<(), StatsigErr> {
141        // TODO: we probably should have another option for config sync sources, but for now, we only have one
142        for adapter in &self.adapters {
143            match adapter
144                .to_owned()
145                .schedule_background_sync(statsig_runtime)
146                .await
147            {
148                Ok(()) => return Ok(()),
149                Err(_) => {
150                    // Carry on and try  next adapter
151                    log_i!(
152                        TAG,
153                        "Skipping schedule bg sync for {}",
154                        adapter.get_type_name()
155                    );
156                }
157            }
158        }
159        Ok(())
160    }
161
162    async fn shutdown(
163        &self,
164        timeout: Duration,
165        statsig_runtime: &Arc<StatsigRuntime>,
166    ) -> Result<(), StatsigErr> {
167        let timeout_for_each = timeout
168            .checked_div(self.adapters.len() as u32)
169            .unwrap_or(timeout);
170        for adapter in &self.adapters {
171            let _ = adapter.shutdown(timeout_for_each, statsig_runtime).await;
172        }
173        Ok(())
174    }
175
176    fn get_type_name(&self) -> String {
177        stringify!(StatsigCustomizedSpecsAdapter).to_string()
178    }
179}