statsig_rust/specs_adapter/
statsig_customized_specs_adapter.rs1use std::{sync::Arc, time::Duration};
2
3use super::statsig_data_store_specs_adapter::StatsigDataStoreSpecsAdapter;
4use super::StatsigHttpSpecsAdapter;
5use super::{SpecAdapterConfig, SpecsAdapterType};
6use crate::data_store_interface::DataStoreTrait;
7use crate::hashing::HashUtil;
8use crate::{log_i, log_w, SpecsAdapter, SpecsUpdateListener, StatsigOptions};
9use crate::{StatsigErr, StatsigRuntime};
10use async_trait::async_trait;
11
12#[cfg(feature = "with_grpc")]
13use super::statsig_grpc_specs_adapter::StatsigGrpcSpecsAdapter;
14
15#[cfg(not(feature = "with_grpc"))]
16use crate::log_e;
17
18const TAG: &str = stringify!(StatsigCustomizedSpecsAdapter);
19
20pub struct StatsigCustomizedSpecsAdapter {
21 adapters: Vec<Arc<dyn SpecsAdapter>>,
22}
23
24impl StatsigCustomizedSpecsAdapter {
25 pub fn new_from_config(
26 sdk_key: &str,
27 configs: Vec<SpecAdapterConfig>,
28 options: &StatsigOptions,
29 hashing: &HashUtil,
30 ) -> Self {
31 let mut adapters: Vec<Arc<dyn SpecsAdapter>> = Vec::new();
32 for config in &configs {
33 match config.adapter_type {
34 SpecsAdapterType::NetworkGrpcWebsocket => {
35 if let Some(adapter) = Self::create_grpc_adapter(sdk_key, config) {
36 adapters.push(adapter);
37 }
38 }
39 SpecsAdapterType::NetworkHttp => {
40 adapters.push(Arc::new(StatsigHttpSpecsAdapter::new(
42 sdk_key,
43 config.specs_url.as_ref(),
44 options.fallback_to_statsig_api.unwrap_or(false),
45 options.specs_sync_interval_ms,
46 )));
47 }
48 SpecsAdapterType::DataStore => match options.data_store.clone() {
49 Some(data_store) => {
50 adapters.push(Arc::new(StatsigDataStoreSpecsAdapter::new(
51 sdk_key,
52 data_store,
53 hashing,
54 options.specs_sync_interval_ms,
55 )));
56 }
57 None => log_w!(TAG, "Datastore is not present for syncing spec"),
58 },
59 }
60 }
61
62 StatsigCustomizedSpecsAdapter { adapters }
63 }
64
65 pub fn new_from_data_store(
66 sdk_key: &str,
67 data_store: Arc<dyn DataStoreTrait>,
68 options: &StatsigOptions,
69 hashing: &HashUtil,
70 ) -> Self {
71 let data_adapter_spec_adapter = StatsigDataStoreSpecsAdapter::new(
72 sdk_key,
73 data_store,
74 hashing,
75 options.specs_sync_interval_ms,
76 );
77 let http_adapter = StatsigHttpSpecsAdapter::new(
78 sdk_key,
79 options.specs_url.as_ref(),
80 options.fallback_to_statsig_api.unwrap_or(false),
81 options.specs_sync_interval_ms,
82 );
83 let adapters: Vec<Arc<dyn SpecsAdapter>> =
84 vec![Arc::new(data_adapter_spec_adapter), Arc::new(http_adapter)];
85 StatsigCustomizedSpecsAdapter { adapters }
86 }
87
88 #[cfg(feature = "with_grpc")]
89 fn create_grpc_adapter(
90 sdk_key: &str,
91 config: &SpecAdapterConfig,
92 ) -> Option<Arc<dyn SpecsAdapter>> {
93 Some(Arc::new(StatsigGrpcSpecsAdapter::new(sdk_key, config)))
94 }
95
96 #[cfg(not(feature = "with_grpc"))]
97 fn create_grpc_adapter(
98 _sdk_key: &str,
99 _config: &SpecAdapterConfig,
100 ) -> Option<Arc<dyn SpecsAdapter>> {
101 log_e!(
102 TAG,
103 "Trying to use grpc websocket adapter but with grpc feature is not enabled"
104 );
105 None
106 }
107}
108
109#[async_trait]
110impl SpecsAdapter for StatsigCustomizedSpecsAdapter {
111 async fn start(
112 self: Arc<Self>,
113 statsig_runtime: &Arc<StatsigRuntime>,
114 ) -> Result<(), StatsigErr> {
115 for adapter in &self.adapters {
116 match adapter.to_owned().start(statsig_runtime).await {
117 Ok(()) => {
118 return Ok(());
119 }
120 _ => {
121 log_w!(
122 TAG,
123 "Failed to initialize from {} adapter",
124 adapter.get_type_name()
125 );
126 }
128 }
129 }
130 return Err(StatsigErr::NetworkError(
131 "Failed to start any adapters".to_string(),
132 ));
133 }
134
135 fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
136 for adapter in &self.adapters {
137 adapter.initialize(listener.clone());
138 }
139 }
140
141 async fn schedule_background_sync(
142 self: Arc<Self>,
143 statsig_runtime: &Arc<StatsigRuntime>,
144 ) -> Result<(), StatsigErr> {
145 for adapter in &self.adapters {
147 match adapter
148 .to_owned()
149 .schedule_background_sync(statsig_runtime)
150 .await
151 {
152 Ok(()) => return Ok(()),
153 Err(_) => {
154 log_i!(
156 TAG,
157 "Skipping schedule bg sync for {}",
158 adapter.get_type_name()
159 );
160 }
161 }
162 }
163 Ok(())
164 }
165
166 async fn shutdown(
167 &self,
168 timeout: Duration,
169 statsig_runtime: &Arc<StatsigRuntime>,
170 ) -> Result<(), StatsigErr> {
171 let timeout_for_each = timeout
172 .checked_div(self.adapters.len() as u32)
173 .unwrap_or(timeout);
174 for adapter in &self.adapters {
175 let _ = adapter.shutdown(timeout_for_each, statsig_runtime).await;
176 }
177 Ok(())
178 }
179
180 fn get_type_name(&self) -> String {
181 stringify!(StatsigCustomizedSpecsAdapter).to_string()
182 }
183}