statsig_rust/specs_adapter/
statsig_customized_specs_adapter.rs1use std::{sync::Arc, time::Duration};
2
3use super::statsig_data_store_specs_adapter::StatsigDataStoreSpecsAdapter;
4use super::StatsigHttpSpecsAdapter;
5use super::{SpecAdapterConfig, SpecsAdapterType};
6use crate::data_store_interface::DataStoreTrait;
7use crate::hashing::HashUtil;
8use crate::{log_i, log_w, SpecsAdapter, SpecsUpdateListener, StatsigOptions};
9use crate::{StatsigErr, StatsigRuntime};
10use async_trait::async_trait;
11
12#[cfg(feature = "with_grpc")]
13use super::statsig_grpc_specs_adapter::StatsigGrpcSpecsAdapter;
14
15#[cfg(not(feature = "with_grpc"))]
16use crate::log_e;
17
18const TAG: &str = stringify!(StatsigCustomizedSpecsAdapter);
19
20pub struct StatsigCustomizedSpecsAdapter {
21 adapters: Vec<Arc<dyn SpecsAdapter>>,
22}
23
24impl StatsigCustomizedSpecsAdapter {
25 pub fn new_from_config(
26 sdk_key: &str,
27 configs: Vec<SpecAdapterConfig>,
28 options: &StatsigOptions,
29 hashing: &HashUtil,
30 ) -> Self {
31 let mut adapters: Vec<Arc<dyn SpecsAdapter>> = Vec::new();
32 for config in &configs {
33 match config.adapter_type {
34 SpecsAdapterType::NetworkGrpcWebsocket => {
35 if let Some(adapter) = Self::create_grpc_adapter(sdk_key, config, options) {
36 adapters.push(adapter);
37 }
38 }
39 SpecsAdapterType::NetworkHttp => {
40 adapters.push(Arc::new(StatsigHttpSpecsAdapter::new(
42 sdk_key,
43 Some(options),
44 )));
45 }
46 SpecsAdapterType::DataStore => match options.data_store.clone() {
47 Some(data_store) => {
48 adapters.push(Arc::new(StatsigDataStoreSpecsAdapter::new(
49 sdk_key,
50 data_store,
51 hashing,
52 Some(options),
53 )));
54 }
55 None => log_w!(TAG, "Datastore is not present for syncing spec"),
56 },
57 }
58 }
59
60 StatsigCustomizedSpecsAdapter { adapters }
61 }
62
63 pub fn new_from_data_store(
64 sdk_key: &str,
65 data_store: Arc<dyn DataStoreTrait>,
66 options: &StatsigOptions,
67 hashing: &HashUtil,
68 ) -> Self {
69 let data_adapter_spec_adapter =
70 StatsigDataStoreSpecsAdapter::new(sdk_key, data_store, hashing, Some(options));
71 let http_adapter = StatsigHttpSpecsAdapter::new(sdk_key, Some(options));
72 let adapters: Vec<Arc<dyn SpecsAdapter>> =
73 vec![Arc::new(data_adapter_spec_adapter), Arc::new(http_adapter)];
74 StatsigCustomizedSpecsAdapter { adapters }
75 }
76
77 #[cfg(feature = "with_grpc")]
78 fn create_grpc_adapter(
79 sdk_key: &str,
80 config: &SpecAdapterConfig,
81 options: &StatsigOptions,
82 ) -> Option<Arc<dyn SpecsAdapter>> {
83 Some(Arc::new(StatsigGrpcSpecsAdapter::new(
84 sdk_key,
85 config,
86 Some(options),
87 )))
88 }
89
90 #[cfg(not(feature = "with_grpc"))]
91 fn create_grpc_adapter(
92 _sdk_key: &str,
93 _config: &SpecAdapterConfig,
94 _options: &StatsigOptions,
95 ) -> Option<Arc<dyn SpecsAdapter>> {
96 log_e!(
97 TAG,
98 "Trying to use grpc websocket adapter but with grpc feature is not enabled"
99 );
100 None
101 }
102}
103
104#[async_trait]
105impl SpecsAdapter for StatsigCustomizedSpecsAdapter {
106 async fn start(
107 self: Arc<Self>,
108 statsig_runtime: &Arc<StatsigRuntime>,
109 ) -> Result<(), StatsigErr> {
110 for adapter in &self.adapters {
111 match adapter.to_owned().start(statsig_runtime).await {
112 Ok(()) => {
113 return Ok(());
114 }
115 Err(e) => {
116 log_w!(
117 TAG,
118 "Failed to initialize from {} adapter: {:?}",
119 adapter.get_type_name(),
120 e
121 );
122 }
124 }
125 }
126 return Err(StatsigErr::UnstartedAdapter(
127 "Failed to start any adapters".to_string(),
128 ));
129 }
130
131 fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
132 for adapter in &self.adapters {
133 adapter.initialize(listener.clone());
134 }
135 }
136
137 async fn schedule_background_sync(
138 self: Arc<Self>,
139 statsig_runtime: &Arc<StatsigRuntime>,
140 ) -> Result<(), StatsigErr> {
141 for adapter in &self.adapters {
143 match adapter
144 .to_owned()
145 .schedule_background_sync(statsig_runtime)
146 .await
147 {
148 Ok(()) => return Ok(()),
149 Err(_) => {
150 log_i!(
152 TAG,
153 "Skipping schedule bg sync for {}",
154 adapter.get_type_name()
155 );
156 }
157 }
158 }
159 Ok(())
160 }
161
162 async fn shutdown(
163 &self,
164 timeout: Duration,
165 statsig_runtime: &Arc<StatsigRuntime>,
166 ) -> Result<(), StatsigErr> {
167 let timeout_for_each = timeout
168 .checked_div(self.adapters.len() as u32)
169 .unwrap_or(timeout);
170 for adapter in &self.adapters {
171 let _ = adapter.shutdown(timeout_for_each, statsig_runtime).await;
172 }
173 Ok(())
174 }
175
176 fn get_type_name(&self) -> String {
177 stringify!(StatsigCustomizedSpecsAdapter).to_string()
178 }
179}