feature_probe_server/http/
mod.rs

1mod handler;
2
3use axum::{
4    extract::{Extension, Query},
5    handler::Handler,
6    headers::HeaderName,
7    http::StatusCode,
8    response::{Html, IntoResponse, Response},
9    routing::{get, post},
10    Json, Router, TypedHeader,
11};
12use reqwest::header;
13
14use crate::FPServerError;
15use feature_probe_event::collector::{post_events, EventHandler};
16use feature_probe_server_sdk::SdkAuthorization;
17#[cfg(feature = "unstable")]
18use feature_probe_server_sdk::Segment;
19use feature_probe_server_sdk::Toggle;
20pub use handler::{FpHttpHandler, HttpHandler, LocalFileHttpHandlerForTest};
21use serde::Deserialize;
22use serde_json::json;
23use std::collections::HashMap;
24use std::net::SocketAddr;
25
26pub async fn serve_http<T>(port: u16, handler: T)
27where
28    T: HttpHandler + EventHandler + Clone + Send + Sync + 'static,
29{
30    let app = Router::new()
31        .route("/", get(root_handler))
32        .route(
33            "/api/client-sdk/toggles",
34            get(client_sdk_toggles::<T>).options(client_cors),
35        )
36        .route(
37            "/api/client-sdk/events",
38            get(client_sdk_events::<T>).options(client_cors),
39        )
40        .route("/api/server-sdk/toggles", get(server_sdk_toggles::<T>))
41        .route("/api/events", post(post_events::<T>).options(client_cors))
42        .route("/internal/all_secrets", get(all_secrets::<T>)) // not for public network
43        .route("/internal/update_toggles", post(update_toggles::<T>))
44        .layer(Extension(handler))
45        .fallback(handler_404.into_service());
46
47    #[cfg(feature = "unstable")]
48    let app = app
49        .route("/internal/server/segments", post(update_segments::<T>))
50        .route("/intelnal/server/check_secrets", post(check_secrets::<T>));
51
52    let addr = SocketAddr::from(([0, 0, 0, 0], port));
53    axum::Server::bind(&addr)
54        .serve(app.into_make_service())
55        .await
56        .unwrap();
57}
58
59async fn client_cors() -> Response {
60    (StatusCode::OK, cors_headers()).into_response()
61}
62
63async fn client_sdk_toggles<T>(
64    params: Query<ClientParams>,
65    sdk_key: TypedHeader<SdkAuthorization>,
66    Extension(handler): Extension<T>,
67) -> Result<Response, FPServerError>
68where
69    T: HttpHandler + Clone + Send + Sync + 'static,
70{
71    handler.client_sdk_toggles(params, sdk_key).await
72}
73
74async fn client_sdk_events<T>(
75    sdk_key: TypedHeader<SdkAuthorization>,
76    Extension(handler): Extension<T>,
77) -> Result<Response, FPServerError>
78where
79    T: HttpHandler + Clone + Send + Sync + 'static,
80{
81    handler.client_sdk_events(sdk_key).await
82}
83
84async fn server_sdk_toggles<T>(
85    sdk_key: TypedHeader<SdkAuthorization>,
86    Extension(handler): Extension<T>,
87) -> Result<Response, FPServerError>
88where
89    T: HttpHandler + Clone + Send + Sync + 'static,
90{
91    handler.server_sdk_toggles(sdk_key).await
92}
93
94async fn update_toggles<T>(
95    params: Json<ToggleUpdateParams>,
96    Extension(handler): Extension<T>,
97) -> Result<Response, FPServerError>
98where
99    T: HttpHandler + Clone + Send + Sync + 'static,
100{
101    handler.update_toggles(params).await
102}
103
104#[cfg(feature = "unstable")]
105async fn update_segments<T>(
106    params: Json<SegmentUpdateParams>,
107    Extension(handler): Extension<T>,
108) -> Result<Response, FPServerError>
109where
110    T: HttpHandler + Clone + Send + Sync + 'static,
111{
112    handler.update_segments(params).await
113}
114
115#[cfg(feature = "unstable")]
116async fn check_secrets<T>(
117    params: Json<SecretsParams>,
118    Extension(handler): Extension<T>,
119) -> Result<Json<HashMap<String, String>>, FPServerError>
120where
121    T: HttpHandler + Clone + Send + Sync + 'static,
122{
123    handler.check_secrets(params).await
124}
125
126async fn all_secrets<T>(
127    Extension(handler): Extension<T>,
128) -> Result<Json<HashMap<String, String>>, FPServerError>
129where
130    T: HttpHandler + Clone + Send + Sync + 'static,
131{
132    handler.all_secrets().await
133}
134
135async fn root_handler() -> Html<&'static str> {
136    Html("<h1>Feature Probe Server</h1>")
137}
138
139async fn handler_404() -> impl IntoResponse {
140    (StatusCode::NOT_FOUND, "")
141}
142
143#[derive(Debug, Deserialize)]
144pub struct ClientParams {
145    user: String,
146}
147
148#[allow(unused)]
149#[derive(Debug, Deserialize)]
150pub struct ToggleUpdateParams {
151    sdk_key: String,
152    #[serde(default)]
153    toggles: HashMap<String, Toggle>,
154    #[serde(default)]
155    version: Option<String>,
156}
157
158#[cfg(feature = "unstable")]
159#[derive(Debug, Deserialize)]
160pub struct SegmentUpdateParams {
161    segments: HashMap<String, Segment>,
162}
163
164#[cfg(feature = "unstable")]
165#[derive(Debug, Deserialize)]
166pub struct SecretsParams {
167    _secrets: HashMap<String, String>,
168}
169
170impl IntoResponse for FPServerError {
171    fn into_response(self) -> Response {
172        let (status, error_message) = match self {
173            FPServerError::NotFound(_) => (StatusCode::NOT_FOUND, self.to_string()),
174            FPServerError::UserDecodeError => (StatusCode::BAD_REQUEST, self.to_string()),
175            _ => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()),
176        };
177
178        let body = Json(json!({
179            "error": error_message,
180        }));
181
182        (status, cors_headers(), body).into_response()
183    }
184}
185
186pub fn cors_headers() -> [(HeaderName, &'static str); 4] {
187    [
188        (header::CONTENT_TYPE, "application/json"),
189        (header::ACCESS_CONTROL_ALLOW_HEADERS, "*"),
190        (header::ACCESS_CONTROL_ALLOW_ORIGIN, "*"),
191        (
192            header::ACCESS_CONTROL_ALLOW_METHODS,
193            "GET, POST, PUT, DELETE, OPTIONS",
194        ),
195    ]
196}
197
198#[cfg(test)]
199mod tests {
200    use super::{handler::LocalFileHttpHandlerForTest, *};
201    #[cfg(feature = "realtime")]
202    use crate::realtime::RealtimeSocket;
203    use crate::{base::ServerConfig, repo::SdkRepository};
204    use feature_probe_server_sdk::{FPDetail, FPUser, Repository, SdkAuthorization};
205    use reqwest::header::CONTENT_TYPE;
206    use reqwest::{header::AUTHORIZATION, Client, Error, Method, Url};
207    use serde_json::Value;
208    use std::{fs, path::PathBuf, sync::Arc, time::Duration};
209
210    #[test]
211    fn deserialize_toggle_udpate_param() {
212        let toggle_update_param = r#"{"sdk_key": "key1"}"#;
213        let p: ToggleUpdateParams = serde_json::from_str(toggle_update_param).unwrap();
214        assert_eq!(p.sdk_key, "key1");
215    }
216
217    #[tokio::test]
218    async fn test_fp_server_connect_fp_api() {
219        let server_sdk_key = "server-sdk-key1".to_owned();
220        let client_sdk_key = "client-sdk-key1".to_owned();
221        let mock_api_port = 9002;
222        let fp_server_port = 9003;
223        setup_mock_api(mock_api_port).await;
224        let repo = setup_fp_server(
225            mock_api_port,
226            fp_server_port,
227            &client_sdk_key,
228            &server_sdk_key,
229        )
230        .await;
231        tokio::time::sleep(Duration::from_millis(100)).await; // wait fp server port listen
232        let repo_string = repo.server_sdk_repo_string(&server_sdk_key);
233        assert!(repo_string.is_ok());
234
235        let resp = http_get(
236            format!("http://127.0.0.1:{}/api/server-sdk/toggles", fp_server_port),
237            Some(server_sdk_key.clone()),
238        )
239        .await;
240        assert!(resp.is_ok());
241        let body = resp.unwrap().text().await;
242        assert!(body.is_ok());
243        let body = body.unwrap();
244        let r = serde_json::from_str::<Repository>(&body).unwrap();
245        assert_eq!(r, repo_from_test_file());
246
247        let resp = http_get(
248            format!("http://127.0.0.1:{}/api/server-sdk/toggles", fp_server_port),
249            Some("no_exist_server_key".to_owned()),
250        )
251        .await;
252        assert!(resp.is_ok());
253        let body = resp.unwrap().text().await;
254        assert!(body.is_ok());
255        let body = body.unwrap();
256        let r = serde_json::from_str::<Repository>(&body).unwrap();
257        assert_eq!(r, Repository::default());
258
259        let resp = http_get(
260            format!("http://127.0.0.1:{}/internal/all_secrets", fp_server_port),
261            None,
262        )
263        .await;
264        assert!(resp.is_ok());
265
266        let resp = http_get(
267            format!("http://127.0.0.1:{}/internal/all_secrets", mock_api_port),
268            None,
269        )
270        .await;
271        assert!(resp.is_ok());
272    }
273
274    #[tokio::test]
275    async fn test_server_sdk_toggles() {
276        let port = 9004;
277        let handler = LocalFileHttpHandlerForTest::default();
278        tokio::spawn(crate::http::serve_http::<LocalFileHttpHandlerForTest>(
279            port, handler,
280        ));
281        tokio::time::sleep(Duration::from_millis(100)).await; // wait port listen
282
283        let resp = http_get(
284            format!("http://127.0.0.1:{}/api/server-sdk/toggles", port),
285            Some("sdk-key".to_owned()),
286        )
287        .await;
288        assert!(resp.is_ok(), "response invalid");
289        let body = resp.unwrap().text().await;
290        assert!(body.is_ok(), "response body error");
291        let body = body.unwrap();
292        let r = serde_json::from_str::<Repository>(&body).unwrap();
293        assert_eq!(r, repo_from_test_file());
294    }
295
296    #[tokio::test]
297    async fn test_client_sdk_toggles() {
298        let server_sdk_key = "server-sdk-key1".to_owned();
299        let client_sdk_key = "client-sdk-key1".to_owned();
300        let mock_api_port = 9005;
301        let fp_server_port = 9006;
302        setup_mock_api(mock_api_port).await;
303        let _ = setup_fp_server(
304            mock_api_port,
305            fp_server_port,
306            &client_sdk_key,
307            &server_sdk_key,
308        )
309        .await;
310        tokio::time::sleep(Duration::from_millis(100)).await; // wait fp server port listen
311
312        let user = FPUser::new().with("city", "1");
313        let user_json = serde_json::to_string(&user).unwrap();
314        let user_base64 = base64::encode(&user_json);
315        let resp = http_get(
316            format!(
317                "http://127.0.0.1:{}/api/client-sdk/toggles?user={}",
318                fp_server_port, user_base64
319            ),
320            Some(client_sdk_key.clone()),
321        )
322        .await;
323        assert!(resp.is_ok(), "response invalid");
324        let text = resp.unwrap().text().await;
325        assert!(text.is_ok(), "response text error");
326        let text = text.unwrap();
327        let toggles = serde_json::from_str::<HashMap<String, FPDetail<Value>>>(&text);
328        assert!(toggles.is_ok(), "response can not deserialize");
329        let toggles = toggles.unwrap();
330        let t = toggles.get("bool_toggle");
331        assert!(t.is_some(), "toggle not found");
332        let t = t.unwrap();
333        assert_eq!(t.rule_index, Some(0));
334        assert_eq!(t.value.as_bool(), Some(true));
335
336        // for test coverage
337        let resp = http_get(
338            format!(
339                "http://127.0.0.1:{}/api/client-sdk/toggles?user={}",
340                mock_api_port, user_base64
341            ),
342            Some(client_sdk_key.clone()),
343        )
344        .await;
345        assert!(resp.is_ok())
346    }
347
348    #[tokio::test]
349    async fn test_events() {
350        let server_sdk_key = "server-sdk-key1".to_owned();
351        let client_sdk_key = "client-sdk-key1".to_owned();
352        let mock_api_port = 9007;
353        let fp_server_port = 9008;
354        setup_mock_api(mock_api_port).await;
355        let _ = setup_fp_server(
356            mock_api_port,
357            fp_server_port,
358            &client_sdk_key,
359            &server_sdk_key,
360        )
361        .await;
362        tokio::time::sleep(Duration::from_millis(100)).await; // wait fp server port listen
363
364        let resp = http_post(
365            format!("http://127.0.0.1:{}/api/events", fp_server_port),
366            Some(client_sdk_key.clone()),
367            "[]".to_owned(),
368        )
369        .await;
370        assert!(resp.is_ok());
371
372        let resp = http_post(
373            format!("http://127.0.0.1:{}/api/events", mock_api_port),
374            Some(client_sdk_key.clone()),
375            "[]".to_owned(),
376        )
377        .await;
378        assert!(resp.is_ok());
379    }
380
381    async fn http_get(url: String, sdk_key: Option<String>) -> Result<reqwest::Response, Error> {
382        let url = Url::parse(&url).unwrap();
383        let timeout = Duration::from_secs(1);
384        let mut request = Client::new().request(Method::GET, url);
385
386        if let Some(sdk_key) = sdk_key {
387            let auth = SdkAuthorization(sdk_key).encode();
388            request = request.header(AUTHORIZATION, auth);
389        }
390        request = request.timeout(timeout);
391        request.send().await
392    }
393
394    async fn http_post(
395        url: String,
396        sdk_key: Option<String>,
397        body: String,
398    ) -> Result<reqwest::Response, Error> {
399        let url = Url::parse(&url).unwrap();
400        let timeout = Duration::from_secs(1);
401        let mut request = Client::new()
402            .request(Method::POST, url)
403            .header(CONTENT_TYPE, "application/json")
404            .header("user-agent", "Rust/0.0.0")
405            .header("ua", "Rust/0.0.0")
406            .body(body);
407
408        if let Some(sdk_key) = sdk_key {
409            let auth = SdkAuthorization(sdk_key).encode();
410            request = request.header(AUTHORIZATION, auth);
411        }
412        request = request.timeout(timeout);
413        request.send().await
414    }
415
416    fn repo_from_test_file() -> Repository {
417        let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
418        path.push("resources/fixtures/repo.json");
419        let json_str = fs::read_to_string(path).unwrap();
420        serde_json::from_str::<Repository>(&json_str).unwrap()
421    }
422
423    async fn setup_mock_api(port: u16) {
424        let mock_feature_probe_api = LocalFileHttpHandlerForTest::default();
425        tokio::spawn(crate::http::serve_http::<LocalFileHttpHandlerForTest>(
426            port,
427            mock_feature_probe_api,
428        ));
429        tokio::time::sleep(Duration::from_millis(100)).await;
430    }
431
432    async fn setup_fp_server(
433        target_port: u16,
434        listen_port: u16,
435        client_sdk_key: &str,
436        server_sdk_key: &str,
437    ) -> Arc<SdkRepository> {
438        let toggles_url = Url::parse(&format!(
439            "http://127.0.0.1:{}/api/server-sdk/toggles",
440            target_port
441        ))
442        .unwrap();
443        let events_url =
444            Url::parse(&format!("http://127.0.0.1:{}/api/events", target_port)).unwrap();
445        let analysis_url = None;
446        let config = ServerConfig {
447            toggles_url,
448            events_url: events_url.clone(),
449            refresh_interval: Duration::from_secs(1),
450            analysis_url: None,
451            keys_url: None,
452            client_sdk_key: Some(client_sdk_key.to_owned()),
453            server_sdk_key: Some(server_sdk_key.to_owned()),
454            server_port: listen_port,
455            #[cfg(feature = "realtime")]
456            realtime_port: listen_port + 100,
457            #[cfg(feature = "realtime")]
458            realtime_path: "/server/realtime".to_owned(),
459        };
460
461        #[cfg(feature = "realtime")]
462        let rs = RealtimeSocket::serve(config.realtime_port, &config.realtime_path);
463
464        let repo = SdkRepository::new(
465            config,
466            #[cfg(feature = "realtime")]
467            rs,
468        );
469        repo.sync(client_sdk_key.to_owned(), server_sdk_key.to_owned(), 1);
470        let repo = Arc::new(repo);
471        let feature_probe_server = FpHttpHandler {
472            repo: repo.clone(),
473            events_url,
474            analysis_url,
475            events_timeout: Duration::from_secs(10),
476            http_client: Default::default(),
477        };
478        tokio::spawn(crate::http::serve_http::<FpHttpHandler>(
479            listen_port,
480            feature_probe_server,
481        ));
482        repo
483    }
484}