feature_probe_server/http/
handler.rs

1use super::{cors_headers, ClientParams, SdkAuthorization, ToggleUpdateParams};
2#[cfg(feature = "unstable")]
3use super::{SecretsParams, SegmentUpdateParams};
4use crate::FPServerError::{NotFound, NotReady};
5use crate::{repo::SdkRepository, FPServerError};
6use axum::{
7    async_trait,
8    extract::Query,
9    http::{HeaderMap, HeaderValue, StatusCode},
10    response::{IntoResponse, Response},
11    Json, TypedHeader,
12};
13use feature_probe_event::collector::{EventHandler, FPEventError};
14use feature_probe_server_sdk::{FPUser, Repository, SyncType, Url};
15use log::info;
16use parking_lot::Mutex;
17use reqwest::{
18    header::{self, AUTHORIZATION, USER_AGENT},
19    Client, Method,
20};
21use serde_json::Value;
22use std::{
23    collections::{HashMap, VecDeque},
24    fs,
25    path::PathBuf,
26    sync::Arc,
27    time::Duration,
28};
29use tracing::debug;
30
31#[async_trait]
32pub trait HttpHandler {
33    async fn server_sdk_toggles(
34        &self,
35        TypedHeader(SdkAuthorization(sdk_key)): TypedHeader<SdkAuthorization>,
36    ) -> Result<Response, FPServerError>;
37
38    async fn client_sdk_toggles(
39        &self,
40        Query(params): Query<ClientParams>,
41        TypedHeader(SdkAuthorization(sdk_key)): TypedHeader<SdkAuthorization>,
42    ) -> Result<Response, FPServerError>;
43
44    async fn client_sdk_events(
45        &self,
46        TypedHeader(SdkAuthorization(sdk_key)): TypedHeader<SdkAuthorization>,
47    ) -> Result<Response, FPServerError>;
48
49    async fn update_toggles(
50        &self,
51        Json(params): Json<ToggleUpdateParams>,
52    ) -> Result<Response, FPServerError>;
53
54    #[cfg(feature = "unstable")]
55    async fn update_segments(
56        &self,
57        Json(params): Json<SegmentUpdateParams>,
58    ) -> Result<Response, FPServerError>;
59
60    #[cfg(feature = "unstable")]
61    async fn check_secrets(
62        &self,
63        Json(_params): Json<SecretsParams>,
64    ) -> Result<Json<HashMap<String, String>>, FPServerError>;
65
66    async fn all_secrets(&self) -> Result<Json<HashMap<String, String>>, FPServerError>;
67}
68
69#[derive(Clone)]
70pub struct FpHttpHandler {
71    pub repo: Arc<SdkRepository>,
72    pub http_client: Arc<Client>,
73    pub events_url: Url,
74    pub analysis_url: Option<Url>,
75    pub events_timeout: Duration,
76}
77
78#[async_trait]
79impl HttpHandler for FpHttpHandler {
80    async fn server_sdk_toggles(
81        &self,
82        TypedHeader(SdkAuthorization(sdk_key)): TypedHeader<SdkAuthorization>,
83    ) -> Result<Response, FPServerError> {
84        match self.repo.server_sdk_repo_string(&sdk_key) {
85            Ok(body) => Ok((
86                StatusCode::OK,
87                [(header::CONTENT_TYPE, "application/json")],
88                body,
89            )
90                .into_response()),
91            Err(e) => match e {
92                NotReady(_) => Ok((
93                    StatusCode::SERVICE_UNAVAILABLE,
94                    [(header::CONTENT_TYPE, "application/json")],
95                    "{}",
96                )
97                    .into_response()),
98                NotFound(_) => Ok((
99                    StatusCode::OK,
100                    [(header::CONTENT_TYPE, "application/json")],
101                    serde_json::to_string(&Repository::default()).unwrap(),
102                )
103                    .into_response()),
104                _ => Err(e),
105            },
106        }
107    }
108
109    async fn client_sdk_toggles(
110        &self,
111        Query(params): Query<ClientParams>,
112        TypedHeader(SdkAuthorization(sdk_key)): TypedHeader<SdkAuthorization>,
113    ) -> Result<Response, FPServerError> {
114        let user = decode_user(params.user)?;
115        match self.repo.client_sdk_eval_string(&sdk_key, &user) {
116            Ok(body) => Ok((StatusCode::OK, cors_headers(), body).into_response()),
117            Err(e) => match e {
118                NotReady(_) => {
119                    Ok((StatusCode::SERVICE_UNAVAILABLE, cors_headers(), "{}").into_response())
120                }
121                NotFound(_) => Ok((StatusCode::OK, cors_headers(), "{}").into_response()),
122                _ => Err(e),
123            },
124        }
125    }
126
127    async fn client_sdk_events(
128        &self,
129        TypedHeader(SdkAuthorization(sdk_key)): TypedHeader<SdkAuthorization>,
130    ) -> Result<Response, FPServerError> {
131        match self.repo.client_sdk_events_string(&sdk_key) {
132            Ok(body) => Ok((StatusCode::OK, cors_headers(), body).into_response()),
133            Err(e) => match e {
134                NotReady(_) => {
135                    Ok((StatusCode::SERVICE_UNAVAILABLE, cors_headers(), "{}").into_response())
136                }
137                NotFound(_) => Ok((StatusCode::OK, cors_headers(), "{}").into_response()),
138                _ => Err(e),
139            },
140        }
141    }
142
143    async fn update_toggles(
144        &self,
145        Json(params): Json<ToggleUpdateParams>,
146    ) -> Result<Response, FPServerError> {
147        let sdk_key = params.sdk_key;
148        match self.repo.client_sync_now(&sdk_key, SyncType::Realtime) {
149            Ok(_sdk_key) => Ok((StatusCode::OK, cors_headers(), "{}").into_response()),
150            Err(e) => match e {
151                NotFound(_) => Ok((
152                    StatusCode::BAD_REQUEST,
153                    [(header::CONTENT_TYPE, "application/json")],
154                    "{}",
155                )
156                    .into_response()),
157                _ => Err(e),
158            },
159        }
160    }
161
162    #[cfg(feature = "unstable")]
163    async fn update_toggles(
164        &self,
165        Json(params): Json<ToggleUpdateParams>,
166    ) -> Result<Response, FPServerError> {
167        self.repo.update_toggles(&params.sdk_key, params.toggles)?;
168        let status = StatusCode::OK;
169        Ok(status.into_response())
170    }
171    #[cfg(feature = "unstable")]
172
173    async fn update_segments(
174        &self,
175        Json(params): Json<SegmentUpdateParams>,
176    ) -> Result<Response, FPServerError> {
177        self.repo.update_segments(params.segments)?;
178        let status = StatusCode::OK;
179        let body = "";
180        Ok((status, body).into_response())
181    }
182
183    #[cfg(feature = "unstable")]
184    async fn check_secrets(
185        &self,
186        Json(_params): Json<SecretsParams>,
187    ) -> Result<Json<HashMap<String, String>>, FPServerError> {
188        Ok(HashMap::new().into())
189    }
190
191    async fn all_secrets(&self) -> Result<Json<HashMap<String, String>>, FPServerError> {
192        let secret_keys = self.repo.secret_keys();
193        Ok(secret_keys.into())
194    }
195}
196
197#[async_trait]
198impl EventHandler for FpHttpHandler {
199    async fn handle_events(
200        &self,
201        sdk_key: String,
202        user_agent: String,
203        headers: HeaderMap,
204        data: VecDeque<Value>,
205    ) -> Result<Response, FPEventError> {
206        let http_client = self.http_client.clone();
207        let events_url = self.events_url.clone();
208        let analysis_url = self.analysis_url.clone();
209        let timeout = self.events_timeout;
210        let server_sdk_key = {
211            if sdk_key.starts_with("client") {
212                match self.repo.secret_keys().get(sdk_key.as_str()) {
213                    Some(key) => key.clone(),
214                    None => {
215                        return Ok((StatusCode::UNAUTHORIZED, cors_headers(), "{}").into_response())
216                    }
217                }
218            } else {
219                sdk_key.clone()
220            }
221        };
222        let headers = build_header(headers, server_sdk_key, &user_agent);
223        send_events(
224            headers,
225            events_url,
226            analysis_url,
227            timeout,
228            data,
229            http_client,
230        )
231        .await;
232        Ok((StatusCode::OK, cors_headers(), "{}").into_response())
233    }
234}
235
236async fn send_events(
237    headers: HeaderMap,
238    events_url: Url,
239    analysis_url: Option<Url>,
240    timeout: Duration,
241    data: VecDeque<Value>,
242    http_client: Arc<Client>,
243) {
244    if let Some(analysis_url) = analysis_url {
245        let headers = headers.clone();
246        let data = data.clone();
247        let http_client = http_client.clone();
248        tokio::spawn(async move {
249            do_send_events(headers, analysis_url, timeout, data, http_client).await;
250        });
251    }
252
253    tokio::spawn(async move {
254        do_send_events(headers, events_url, timeout, data, http_client).await;
255    });
256}
257
258async fn do_send_events(
259    headers: HeaderMap,
260    url: Url,
261    timeout: Duration,
262    data: VecDeque<Value>,
263    http_client: Arc<Client>,
264) {
265    let traffic_request = http_client
266        .request(Method::POST, url.clone())
267        .headers(headers.clone())
268        .timeout(timeout)
269        .json(&data);
270
271    debug!("traffic post data: {data:?}");
272    debug!("traffic post req: {traffic_request:?}");
273
274    match traffic_request.send().await {
275        Err(e) => info!("traffic post error: {}", e),
276        Ok(r) => info!("traffic post success: {:?}", r),
277    };
278}
279
280fn build_header(headers: HeaderMap, sdk_key: String, user_agent: &str) -> HeaderMap {
281    let ua = headers.get("ua");
282    let auth = SdkAuthorization(sdk_key).encode();
283    let mut headers = HeaderMap::with_capacity(3);
284    headers.append(AUTHORIZATION, auth);
285    if let Ok(v) = HeaderValue::from_str(user_agent) {
286        headers.append(USER_AGENT, v);
287    }
288
289    if let Some(ua) = ua {
290        headers.append("ua", ua.clone());
291    }
292    headers
293}
294
295fn decode_user(user: String) -> Result<FPUser, FPServerError> {
296    if let Ok(user) = base64::decode(user) {
297        if let Ok(user) = String::from_utf8(user) {
298            if let Ok(user) = serde_json::from_str::<FPUser>(&user) {
299                return Ok(user);
300            }
301        }
302    }
303    Err(FPServerError::UserDecodeError)
304}
305
306// used for mocking feature probe API
307#[derive(Clone, Default)]
308pub struct LocalFileHttpHandlerForTest {
309    pub version_update: bool,
310    body: Arc<Mutex<Option<String>>>,
311}
312
313#[async_trait]
314impl HttpHandler for LocalFileHttpHandlerForTest {
315    async fn server_sdk_toggles(
316        &self,
317        TypedHeader(SdkAuthorization(_sdk_key)): TypedHeader<SdkAuthorization>,
318    ) -> Result<Response, FPServerError> {
319        let mut lock = self.body.lock();
320        let body = match &*lock {
321            None => {
322                let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
323                path.push("resources/fixtures/repo.json");
324                let body = fs::read_to_string(path).unwrap();
325                *lock = Some(body.clone());
326                body
327            }
328            Some(body) => body.clone(),
329        };
330
331        if self.version_update {
332            // SAFETY: json is valid
333            let mut repo: Repository = serde_json::from_str(&body).unwrap();
334            let version = repo.version.unwrap_or_default();
335            repo.version = Some(version + 1);
336            // SAFETY: repo charset valid
337            *lock = Some(serde_json::to_string(&repo).unwrap());
338        }
339
340        Ok((
341            StatusCode::OK,
342            [(header::CONTENT_TYPE, "application/json")],
343            body,
344        )
345            .into_response())
346    }
347
348    async fn client_sdk_toggles(
349        &self,
350        Query(_params): Query<ClientParams>,
351        TypedHeader(SdkAuthorization(_sdk_key)): TypedHeader<SdkAuthorization>,
352    ) -> Result<Response, FPServerError> {
353        let body = "{}".to_owned();
354        Ok((
355            StatusCode::OK,
356            [(header::CONTENT_TYPE, "application/json")],
357            body,
358        )
359            .into_response())
360    }
361
362    async fn client_sdk_events(
363        &self,
364        TypedHeader(SdkAuthorization(_sdk_key)): TypedHeader<SdkAuthorization>,
365    ) -> Result<Response, FPServerError> {
366        let body = "{}".to_owned();
367        Ok((
368            StatusCode::OK,
369            [(header::CONTENT_TYPE, "application/json")],
370            body,
371        )
372            .into_response())
373    }
374
375    async fn update_toggles(
376        &self,
377        Json(_params): Json<ToggleUpdateParams>,
378    ) -> Result<Response, FPServerError> {
379        let body = "{}".to_owned();
380        Ok((
381            StatusCode::OK,
382            [(header::CONTENT_TYPE, "application/json")],
383            body,
384        )
385            .into_response())
386    }
387
388    #[cfg(feature = "unstable")]
389    async fn update_segments(
390        &self,
391        Json(_params): Json<SegmentUpdateParams>,
392    ) -> Result<Response, FPServerError> {
393        let status = StatusCode::OK;
394        let body = "";
395        Ok((status, body).into_response())
396    }
397
398    #[cfg(feature = "unstable")]
399    async fn check_secrets(
400        &self,
401        Json(_params): Json<SecretsParams>,
402    ) -> Result<Json<HashMap<String, String>>, FPServerError> {
403        Ok(HashMap::new().into())
404    }
405
406    async fn all_secrets(&self) -> Result<Json<HashMap<String, String>>, FPServerError> {
407        let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
408        path.push("resources/fixtures/secrets.json");
409        let json_str = fs::read_to_string(path).unwrap();
410        let secret_keys =
411            serde_json::from_str::<HashMap<String, HashMap<String, String>>>(&json_str).unwrap();
412        let secret_keys = secret_keys.get("mapping").unwrap().to_owned();
413        Ok(secret_keys.into())
414    }
415}
416
417#[async_trait]
418impl EventHandler for LocalFileHttpHandlerForTest {
419    async fn handle_events(
420        &self,
421        _sdk_key: String,
422        _user_agent: String,
423        _headers: HeaderMap,
424        _data: VecDeque<Value>,
425    ) -> Result<Response, FPEventError> {
426        Ok((StatusCode::OK, cors_headers(), "").into_response())
427    }
428}