sigstat_grpc/
statsig_grpc_client.rs

1use crate::statsig_forward_proxy::config_spec_request::ApiVersion;
2use crate::statsig_forward_proxy::statsig_forward_proxy_client::StatsigForwardProxyClient;
3use crate::statsig_forward_proxy::{ConfigSpecRequest, ConfigSpecResponse};
4use crate::statsig_grpc_err::StatsigGrpcErr;
5use std::sync::Mutex;
6use std::time::Duration;
7use tonic::transport::Channel;
8use tonic::Streaming;
9
10pub struct StatsigGrpcClient {
11    sdk_key: String,
12    proxy_api: String,
13    grpc_client: Mutex<Option<StatsigForwardProxyClient<Channel>>>,
14}
15
16impl StatsigGrpcClient {
17    pub fn new(sdk_key: &str, proxy_api: &str) -> Self {
18        Self {
19            sdk_key: sdk_key.to_string(),
20            proxy_api: proxy_api.to_string(),
21            grpc_client: Mutex::new(None),
22        }
23    }
24
25    pub async fn connect_client(&self) -> Result<(), StatsigGrpcErr> {
26        self.get_or_setup_grpc_client().await.map(|_| ())
27    }
28
29    pub fn reset_client(&self) {
30        if let Ok(mut lock) = self.grpc_client.lock() {
31            *lock = None;
32        }
33    }
34
35    pub async fn get_specs(
36        &self,
37        lcut: Option<u64>,
38        zstd_dict_id: Option<&String>,
39    ) -> Result<ConfigSpecResponse, StatsigGrpcErr> {
40        let request = create_config_spec_request(&self.sdk_key, lcut, zstd_dict_id);
41        let mut client = self.get_or_setup_grpc_client().await?;
42
43        client
44            .get_config_spec(request)
45            .await
46            .map_err(StatsigGrpcErr::ErrorGrpcStatus)
47            .map(|r| r.into_inner())
48    }
49
50    pub async fn get_specs_stream(
51        &self,
52        lcut: Option<u64>,
53        zstd_dict_id: Option<&String>,
54    ) -> Result<Streaming<ConfigSpecResponse>, StatsigGrpcErr> {
55        let request = create_config_spec_request(&self.sdk_key, lcut, zstd_dict_id);
56        let mut client = self.get_or_setup_grpc_client().await?;
57
58        client
59            .stream_config_spec(request)
60            .await
61            .map_err(StatsigGrpcErr::ErrorGrpcStatus)
62            .map(|s| s.into_inner())
63    }
64
65    async fn get_or_setup_grpc_client(
66        &self,
67    ) -> Result<StatsigForwardProxyClient<Channel>, StatsigGrpcErr> {
68        {
69            let lock = self
70                .grpc_client
71                .lock()
72                .map_err(|_| StatsigGrpcErr::FailedToGetLock)?;
73
74            if let Some(client) = lock.as_ref() {
75                return Ok(client.clone());
76            }
77        }
78
79        let channel = Channel::from_shared(self.proxy_api.clone())
80            .map_err(|e| StatsigGrpcErr::FailedToConnect(e.to_string()))?
81            .connect_timeout(Duration::from_secs(5))
82            .tcp_keepalive(Some(Duration::from_secs(30)))
83            .keep_alive_while_idle(true)
84            .http2_keep_alive_interval(Duration::from_secs(30))
85            .connect()
86            .await
87            .map_err(|e| StatsigGrpcErr::FailedToConnect(e.to_string()))?;
88
89        let new_client = StatsigForwardProxyClient::new(channel);
90
91        let mut lock = self
92            .grpc_client
93            .lock()
94            .map_err(|_| StatsigGrpcErr::FailedToGetLock)?;
95
96        *lock = Some(new_client.clone());
97        Ok(new_client)
98    }
99}
100
101fn create_config_spec_request(
102    sdk_key: &str,
103    current_lcut: Option<u64>,
104    current_zstd_dict_id: Option<&String>,
105) -> ConfigSpecRequest {
106    ConfigSpecRequest {
107        since_time: current_lcut,
108        sdk_key: sdk_key.to_string(),
109        version: Some(ApiVersion::V2 as i32),
110        zstd_dict_id: current_zstd_dict_id.cloned(),
111    }
112}