statsig_rust/specs_adapter/
statsig_customized_specs_adapter.rs1use std::{sync::Arc, time::Duration};
2
3use super::statsig_data_store_specs_adapter::StatsigDataStoreSpecsAdapter;
4use super::StatsigHttpSpecsAdapter;
5use super::{SpecAdapterConfig, SpecsAdapterType};
6use crate::data_store_interface::DataStoreTrait;
7use crate::hashing::HashUtil;
8use crate::{log_i, log_w, SpecsAdapter, SpecsUpdateListener, StatsigOptions};
9use crate::{StatsigErr, StatsigRuntime};
10use async_trait::async_trait;
11
12#[cfg(feature = "with_grpc")]
13use super::statsig_grpc_specs_adapter::StatsigGrpcSpecsAdapter;
14
15#[cfg(not(feature = "with_grpc"))]
16use crate::log_e;
17
18const TAG: &str = stringify!(StatsigCustomizedSpecsAdapter);
19
20pub struct StatsigCustomizedSpecsAdapter {
21 adapters: Vec<Arc<dyn SpecsAdapter>>,
22}
23
24impl StatsigCustomizedSpecsAdapter {
25 pub fn new_from_config(
26 sdk_key: &str,
27 configs: Vec<SpecAdapterConfig>,
28 options: &StatsigOptions,
29 hashing: &HashUtil,
30 ) -> Self {
31 let mut adapters: Vec<Arc<dyn SpecsAdapter>> = Vec::new();
32 for config in &configs {
33 match config.adapter_type {
34 SpecsAdapterType::NetworkGrpcWebsocket => {
35 if let Some(adapter) = Self::create_grpc_adapter(sdk_key, config, options) {
36 adapters.push(adapter);
37 }
38 }
39 SpecsAdapterType::NetworkHttp => {
40 adapters.push(Arc::new(StatsigHttpSpecsAdapter::new(
42 sdk_key,
43 Some(options),
44 config.specs_url.clone(),
45 )));
46 }
47 SpecsAdapterType::DataStore => match options.data_store.clone() {
48 Some(data_store) => {
49 adapters.push(Arc::new(StatsigDataStoreSpecsAdapter::new(
50 sdk_key,
51 data_store,
52 hashing,
53 Some(options),
54 )));
55 }
56 None => log_w!(TAG, "Datastore is not present for syncing spec"),
57 },
58 }
59 }
60
61 StatsigCustomizedSpecsAdapter { adapters }
62 }
63
64 pub fn new_from_data_store(
65 sdk_key: &str,
66 data_store: Arc<dyn DataStoreTrait>,
67 options: &StatsigOptions,
68 hashing: &HashUtil,
69 ) -> Self {
70 let data_adapter_spec_adapter =
71 StatsigDataStoreSpecsAdapter::new(sdk_key, data_store, hashing, Some(options));
72 let http_adapter = StatsigHttpSpecsAdapter::new(sdk_key, Some(options), None);
73 let adapters: Vec<Arc<dyn SpecsAdapter>> =
74 vec![Arc::new(data_adapter_spec_adapter), Arc::new(http_adapter)];
75 StatsigCustomizedSpecsAdapter { adapters }
76 }
77
78 #[cfg(feature = "with_grpc")]
79 fn create_grpc_adapter(
80 sdk_key: &str,
81 config: &SpecAdapterConfig,
82 options: &StatsigOptions,
83 ) -> Option<Arc<dyn SpecsAdapter>> {
84 Some(Arc::new(StatsigGrpcSpecsAdapter::new(
85 sdk_key,
86 config,
87 Some(options),
88 )))
89 }
90
91 #[cfg(not(feature = "with_grpc"))]
92 fn create_grpc_adapter(
93 _sdk_key: &str,
94 _config: &SpecAdapterConfig,
95 _options: &StatsigOptions,
96 ) -> Option<Arc<dyn SpecsAdapter>> {
97 log_e!(
98 TAG,
99 "Trying to use grpc websocket adapter but with grpc feature is not enabled"
100 );
101 None
102 }
103}
104
105#[async_trait]
106impl SpecsAdapter for StatsigCustomizedSpecsAdapter {
107 async fn start(
108 self: Arc<Self>,
109 statsig_runtime: &Arc<StatsigRuntime>,
110 ) -> Result<(), StatsigErr> {
111 for adapter in &self.adapters {
112 match adapter.to_owned().start(statsig_runtime).await {
113 Ok(()) => {
114 return Ok(());
115 }
116 Err(e) => {
117 log_w!(
118 TAG,
119 "Failed to initialize from {} adapter: {:?}",
120 adapter.get_type_name(),
121 e
122 );
123 }
125 }
126 }
127 return Err(StatsigErr::UnstartedAdapter(
128 "Failed to start any adapters".to_string(),
129 ));
130 }
131
132 fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
133 for adapter in &self.adapters {
134 adapter.initialize(listener.clone());
135 }
136 }
137
138 async fn schedule_background_sync(
139 self: Arc<Self>,
140 statsig_runtime: &Arc<StatsigRuntime>,
141 ) -> Result<(), StatsigErr> {
142 for adapter in &self.adapters {
144 match adapter
145 .to_owned()
146 .schedule_background_sync(statsig_runtime)
147 .await
148 {
149 Ok(()) => return Ok(()),
150 Err(_) => {
151 log_i!(
153 TAG,
154 "Skipping schedule bg sync for {}",
155 adapter.get_type_name()
156 );
157 }
158 }
159 }
160 Ok(())
161 }
162
163 async fn shutdown(
164 &self,
165 timeout: Duration,
166 statsig_runtime: &Arc<StatsigRuntime>,
167 ) -> Result<(), StatsigErr> {
168 let timeout_for_each = timeout
169 .checked_div(self.adapters.len() as u32)
170 .unwrap_or(timeout);
171 for adapter in &self.adapters {
172 let _ = adapter.shutdown(timeout_for_each, statsig_runtime).await;
173 }
174 Ok(())
175 }
176
177 fn get_type_name(&self) -> String {
178 stringify!(StatsigCustomizedSpecsAdapter).to_string()
179 }
180}