sigstat_grpc/
statsig_grpc_client.rs1use crate::statsig_forward_proxy::config_spec_request::ApiVersion;
2use crate::statsig_forward_proxy::statsig_forward_proxy_client::StatsigForwardProxyClient;
3use crate::statsig_forward_proxy::{ConfigSpecRequest, ConfigSpecResponse};
4use crate::statsig_grpc_err::StatsigGrpcErr;
5use std::sync::Mutex;
6use std::time::Duration;
7use tonic::transport::Channel;
8use tonic::Streaming;
9
10pub struct StatsigGrpcClient {
11 sdk_key: String,
12 proxy_api: String,
13 grpc_client: Mutex<Option<StatsigForwardProxyClient<Channel>>>,
14}
15
16impl StatsigGrpcClient {
17 pub fn new(sdk_key: &str, proxy_api: &str) -> Self {
18 Self {
19 sdk_key: sdk_key.to_string(),
20 proxy_api: proxy_api.to_string(),
21 grpc_client: Mutex::new(None),
22 }
23 }
24
25 pub async fn connect_client(&self) -> Result<(), StatsigGrpcErr> {
26 self.get_or_setup_grpc_client().await.map(|_| ())
27 }
28
29 pub fn reset_client(&self) {
30 if let Ok(mut lock) = self.grpc_client.lock() {
31 *lock = None;
32 }
33 }
34
35 pub async fn get_specs(
36 &self,
37 lcut: Option<u64>,
38 zstd_dict_id: Option<&String>,
39 ) -> Result<ConfigSpecResponse, StatsigGrpcErr> {
40 let request = create_config_spec_request(&self.sdk_key, lcut, zstd_dict_id);
41 let mut client = self.get_or_setup_grpc_client().await?;
42
43 client
44 .get_config_spec(request)
45 .await
46 .map_err(StatsigGrpcErr::ErrorGrpcStatus)
47 .map(|r| r.into_inner())
48 }
49
50 pub async fn get_specs_stream(
51 &self,
52 lcut: Option<u64>,
53 zstd_dict_id: Option<&String>,
54 ) -> Result<Streaming<ConfigSpecResponse>, StatsigGrpcErr> {
55 let request = create_config_spec_request(&self.sdk_key, lcut, zstd_dict_id);
56 let mut client = self.get_or_setup_grpc_client().await?;
57
58 client
59 .stream_config_spec(request)
60 .await
61 .map_err(StatsigGrpcErr::ErrorGrpcStatus)
62 .map(|s| s.into_inner())
63 }
64
65 async fn get_or_setup_grpc_client(
66 &self,
67 ) -> Result<StatsigForwardProxyClient<Channel>, StatsigGrpcErr> {
68 {
69 let lock = self
70 .grpc_client
71 .lock()
72 .map_err(|_| StatsigGrpcErr::FailedToGetLock)?;
73
74 if let Some(client) = lock.as_ref() {
75 return Ok(client.clone());
76 }
77 }
78
79 let channel = Channel::from_shared(self.proxy_api.clone())
80 .map_err(|e| StatsigGrpcErr::FailedToConnect(e.to_string()))?
81 .connect_timeout(Duration::from_secs(5))
82 .tcp_keepalive(Some(Duration::from_secs(30)))
83 .keep_alive_while_idle(true)
84 .http2_keep_alive_interval(Duration::from_secs(30))
85 .connect()
86 .await
87 .map_err(|e| StatsigGrpcErr::FailedToConnect(e.to_string()))?;
88
89 let new_client = StatsigForwardProxyClient::new(channel);
90
91 let mut lock = self
92 .grpc_client
93 .lock()
94 .map_err(|_| StatsigGrpcErr::FailedToGetLock)?;
95
96 *lock = Some(new_client.clone());
97 Ok(new_client)
98 }
99}
100
101fn create_config_spec_request(
102 sdk_key: &str,
103 current_lcut: Option<u64>,
104 current_zstd_dict_id: Option<&String>,
105) -> ConfigSpecRequest {
106 ConfigSpecRequest {
107 since_time: current_lcut,
108 sdk_key: sdk_key.to_string(),
109 version: Some(ApiVersion::V2 as i32),
110 zstd_dict_id: current_zstd_dict_id.cloned(),
111 }
112}