statsig_rust/specs_adapter/
statsig_customized_specs_adapter.rs1use std::{sync::Arc, time::Duration};
2
3use super::statsig_data_store_specs_adapter::StatsigDataStoreSpecsAdapter;
4use super::StatsigHttpSpecsAdapter;
5use super::{SpecAdapterConfig, SpecsAdapterType};
6use crate::data_store_interface::DataStoreTrait;
7use crate::{log_i, log_w, SpecsAdapter, SpecsUpdateListener, StatsigOptions};
8use crate::{StatsigErr, StatsigRuntime};
9use async_trait::async_trait;
10
11#[cfg(feature = "with_grpc")]
12use super::statsig_grpc_specs_adapter::StatsigGrpcSpecsAdapter;
13
14#[cfg(not(feature = "with_grpc"))]
15use crate::log_e;
16
17const TAG: &str = stringify!(StatsigCustomizedSpecsAdapter);
18
19pub struct StatsigCustomizedSpecsAdapter {
20 adapters: Vec<Arc<dyn SpecsAdapter>>,
21}
22
23impl StatsigCustomizedSpecsAdapter {
24 pub fn new_from_config(
25 sdk_key: &str,
26 data_store_key: &str,
27 configs: Vec<SpecAdapterConfig>,
28 options: &StatsigOptions,
29 ) -> Self {
30 let mut adapters: Vec<Arc<dyn SpecsAdapter>> = Vec::new();
31 for config in &configs {
32 match config.adapter_type {
33 SpecsAdapterType::NetworkGrpcWebsocket => {
34 if let Some(adapter) = Self::create_grpc_adapter(sdk_key, config, options) {
35 adapters.push(adapter);
36 }
37 }
38 SpecsAdapterType::NetworkHttp => {
39 adapters.push(Arc::new(StatsigHttpSpecsAdapter::new(
41 sdk_key,
42 Some(options),
43 config.specs_url.clone(),
44 )));
45 }
46 SpecsAdapterType::DataStore => match options.data_store.clone() {
47 Some(data_store) => {
48 adapters.push(Arc::new(StatsigDataStoreSpecsAdapter::new(
49 data_store_key,
50 data_store,
51 Some(options),
52 )));
53 }
54 None => log_w!(TAG, "Datastore is not present for syncing spec"),
55 },
56 }
57 }
58
59 StatsigCustomizedSpecsAdapter { adapters }
60 }
61
62 pub fn new_from_data_store(
63 sdk_key: &str,
64 data_store_key: &str,
65 data_store: Arc<dyn DataStoreTrait>,
66 options: &StatsigOptions,
67 ) -> Self {
68 let data_store_spec_adapter =
69 StatsigDataStoreSpecsAdapter::new(data_store_key, data_store, Some(options));
70 let http_adapter = StatsigHttpSpecsAdapter::new(sdk_key, Some(options), None);
71 let adapters: Vec<Arc<dyn SpecsAdapter>> =
72 vec![Arc::new(data_store_spec_adapter), Arc::new(http_adapter)];
73 StatsigCustomizedSpecsAdapter { adapters }
74 }
75
76 #[cfg(feature = "with_grpc")]
77 fn create_grpc_adapter(
78 sdk_key: &str,
79 config: &SpecAdapterConfig,
80 options: &StatsigOptions,
81 ) -> Option<Arc<dyn SpecsAdapter>> {
82 Some(Arc::new(StatsigGrpcSpecsAdapter::new(
83 sdk_key,
84 config,
85 Some(options),
86 )))
87 }
88
89 #[cfg(not(feature = "with_grpc"))]
90 fn create_grpc_adapter(
91 _sdk_key: &str,
92 _config: &SpecAdapterConfig,
93 _options: &StatsigOptions,
94 ) -> Option<Arc<dyn SpecsAdapter>> {
95 log_e!(
96 TAG,
97 "Trying to use grpc websocket adapter but with grpc feature is not enabled"
98 );
99 None
100 }
101}
102
103#[async_trait]
104impl SpecsAdapter for StatsigCustomizedSpecsAdapter {
105 async fn start(
106 self: Arc<Self>,
107 statsig_runtime: &Arc<StatsigRuntime>,
108 ) -> Result<(), StatsigErr> {
109 for adapter in &self.adapters {
110 match adapter.to_owned().start(statsig_runtime).await {
111 Ok(()) => {
112 return Ok(());
113 }
114 Err(e) => {
115 log_w!(
116 TAG,
117 "Failed to initialize from {} adapter: {:?}",
118 adapter.get_type_name(),
119 e
120 );
121 }
123 }
124 }
125 return Err(StatsigErr::UnstartedAdapter(
126 "Failed to start any adapters".to_string(),
127 ));
128 }
129
130 fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
131 for adapter in &self.adapters {
132 adapter.initialize(listener.clone());
133 }
134 }
135
136 async fn schedule_background_sync(
137 self: Arc<Self>,
138 statsig_runtime: &Arc<StatsigRuntime>,
139 ) -> Result<(), StatsigErr> {
140 for adapter in &self.adapters {
142 match adapter
143 .to_owned()
144 .schedule_background_sync(statsig_runtime)
145 .await
146 {
147 Ok(()) => return Ok(()),
148 Err(_) => {
149 log_i!(
151 TAG,
152 "Skipping schedule bg sync for {}",
153 adapter.get_type_name()
154 );
155 }
156 }
157 }
158 Ok(())
159 }
160
161 async fn shutdown(
162 &self,
163 timeout: Duration,
164 statsig_runtime: &Arc<StatsigRuntime>,
165 ) -> Result<(), StatsigErr> {
166 let timeout_for_each = timeout
167 .checked_div(self.adapters.len() as u32)
168 .unwrap_or(timeout);
169 for adapter in &self.adapters {
170 let _ = adapter.shutdown(timeout_for_each, statsig_runtime).await;
171 }
172 Ok(())
173 }
174
175 fn get_type_name(&self) -> String {
176 stringify!(StatsigCustomizedSpecsAdapter).to_string()
177 }
178}