statsig_rust/specs_adapter/
statsig_customized_specs_adapter.rs1use std::{sync::Arc, time::Duration};
2
3use super::statsig_data_store_specs_adapter::StatsigDataStoreSpecsAdapter;
4use super::StatsigHttpSpecsAdapter;
5use super::{SpecAdapterConfig, SpecsAdapterType};
6use crate::data_store_interface::DataStoreTrait;
7use crate::hashing::HashUtil;
8use crate::{log_i, log_w, SpecsAdapter, SpecsUpdateListener, StatsigOptions};
9use crate::{StatsigErr, StatsigRuntime};
10use async_trait::async_trait;
11
12#[cfg(feature = "with_grpc")]
13use super::statsig_grpc_specs_adapter::StatsigGrpcSpecsAdapter;
14
15#[cfg(not(feature = "with_grpc"))]
16use crate::log_e;
17
18const TAG: &str = stringify!(StatsigCustomizedSpecsAdapter);
19
20pub struct StatsigCustomizedSpecsAdapter {
21 adapters: Vec<Arc<dyn SpecsAdapter>>,
22}
23
24impl StatsigCustomizedSpecsAdapter {
25 pub fn new_from_config(
26 sdk_key: &str,
27 configs: Vec<SpecAdapterConfig>,
28 options: &StatsigOptions,
29 hashing: &HashUtil,
30 ) -> Self {
31 let mut adapters: Vec<Arc<dyn SpecsAdapter>> = Vec::new();
32 for config in &configs {
33 match config.adapter_type {
34 SpecsAdapterType::NetworkGrpcWebsocket => {
35 if let Some(adapter) = Self::create_grpc_adapter(sdk_key, config, options) {
36 adapters.push(adapter);
37 }
38 }
39 SpecsAdapterType::NetworkHttp => {
40 adapters.push(Arc::new(StatsigHttpSpecsAdapter::new(
42 sdk_key,
43 config.specs_url.as_ref(),
44 options.fallback_to_statsig_api.unwrap_or(false),
45 options.specs_sync_interval_ms,
46 options.disable_network,
47 )));
48 }
49 SpecsAdapterType::DataStore => match options.data_store.clone() {
50 Some(data_store) => {
51 adapters.push(Arc::new(StatsigDataStoreSpecsAdapter::new(
52 sdk_key,
53 data_store,
54 hashing,
55 options.specs_sync_interval_ms,
56 )));
57 }
58 None => log_w!(TAG, "Datastore is not present for syncing spec"),
59 },
60 }
61 }
62
63 StatsigCustomizedSpecsAdapter { adapters }
64 }
65
66 pub fn new_from_data_store(
67 sdk_key: &str,
68 data_store: Arc<dyn DataStoreTrait>,
69 options: &StatsigOptions,
70 hashing: &HashUtil,
71 ) -> Self {
72 let data_adapter_spec_adapter = StatsigDataStoreSpecsAdapter::new(
73 sdk_key,
74 data_store,
75 hashing,
76 options.specs_sync_interval_ms,
77 );
78 let http_adapter = StatsigHttpSpecsAdapter::new(
79 sdk_key,
80 options.specs_url.as_ref(),
81 options.fallback_to_statsig_api.unwrap_or(false),
82 options.specs_sync_interval_ms,
83 options.disable_network,
84 );
85 let adapters: Vec<Arc<dyn SpecsAdapter>> =
86 vec![Arc::new(data_adapter_spec_adapter), Arc::new(http_adapter)];
87 StatsigCustomizedSpecsAdapter { adapters }
88 }
89
90 #[cfg(feature = "with_grpc")]
91 fn create_grpc_adapter(
92 sdk_key: &str,
93 config: &SpecAdapterConfig,
94 options: &StatsigOptions,
95 ) -> Option<Arc<dyn SpecsAdapter>> {
96 Some(Arc::new(StatsigGrpcSpecsAdapter::new(
97 sdk_key,
98 config,
99 options.disable_network,
100 )))
101 }
102
103 #[cfg(not(feature = "with_grpc"))]
104 fn create_grpc_adapter(
105 _sdk_key: &str,
106 _config: &SpecAdapterConfig,
107 _options: &StatsigOptions,
108 ) -> Option<Arc<dyn SpecsAdapter>> {
109 log_e!(
110 TAG,
111 "Trying to use grpc websocket adapter but with grpc feature is not enabled"
112 );
113 None
114 }
115}
116
117#[async_trait]
118impl SpecsAdapter for StatsigCustomizedSpecsAdapter {
119 async fn start(
120 self: Arc<Self>,
121 statsig_runtime: &Arc<StatsigRuntime>,
122 ) -> Result<(), StatsigErr> {
123 for adapter in &self.adapters {
124 match adapter.to_owned().start(statsig_runtime).await {
125 Ok(()) => {
126 return Ok(());
127 }
128 Err(e) => {
129 log_w!(
130 TAG,
131 "Failed to initialize from {} adapter: {:?}",
132 adapter.get_type_name(),
133 e
134 );
135 }
137 }
138 }
139 return Err(StatsigErr::UnstartedAdapter(
140 "Failed to start any adapters".to_string(),
141 ));
142 }
143
144 fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
145 for adapter in &self.adapters {
146 adapter.initialize(listener.clone());
147 }
148 }
149
150 async fn schedule_background_sync(
151 self: Arc<Self>,
152 statsig_runtime: &Arc<StatsigRuntime>,
153 ) -> Result<(), StatsigErr> {
154 for adapter in &self.adapters {
156 match adapter
157 .to_owned()
158 .schedule_background_sync(statsig_runtime)
159 .await
160 {
161 Ok(()) => return Ok(()),
162 Err(_) => {
163 log_i!(
165 TAG,
166 "Skipping schedule bg sync for {}",
167 adapter.get_type_name()
168 );
169 }
170 }
171 }
172 Ok(())
173 }
174
175 async fn shutdown(
176 &self,
177 timeout: Duration,
178 statsig_runtime: &Arc<StatsigRuntime>,
179 ) -> Result<(), StatsigErr> {
180 let timeout_for_each = timeout
181 .checked_div(self.adapters.len() as u32)
182 .unwrap_or(timeout);
183 for adapter in &self.adapters {
184 let _ = adapter.shutdown(timeout_for_each, statsig_runtime).await;
185 }
186 Ok(())
187 }
188
189 fn get_type_name(&self) -> String {
190 stringify!(StatsigCustomizedSpecsAdapter).to_string()
191 }
192}