1use super::{cors_headers, ClientParams, SdkAuthorization, ToggleUpdateParams};
2#[cfg(feature = "unstable")]
3use super::{SecretsParams, SegmentUpdateParams};
4use crate::FPServerError::{NotFound, NotReady};
5use crate::{repo::SdkRepository, FPServerError};
6use axum::{
7 async_trait,
8 extract::Query,
9 http::{HeaderMap, HeaderValue, StatusCode},
10 response::{IntoResponse, Response},
11 Json, TypedHeader,
12};
13use feature_probe_event::collector::{EventHandler, FPEventError};
14use feature_probe_server_sdk::{FPUser, Repository, SyncType, Url};
15use log::info;
16use parking_lot::Mutex;
17use reqwest::{
18 header::{self, AUTHORIZATION, USER_AGENT},
19 Client, Method,
20};
21use serde_json::Value;
22use std::{
23 collections::{HashMap, VecDeque},
24 fs,
25 path::PathBuf,
26 sync::Arc,
27 time::Duration,
28};
29use tracing::debug;
30
31#[async_trait]
32pub trait HttpHandler {
33 async fn server_sdk_toggles(
34 &self,
35 TypedHeader(SdkAuthorization(sdk_key)): TypedHeader<SdkAuthorization>,
36 ) -> Result<Response, FPServerError>;
37
38 async fn client_sdk_toggles(
39 &self,
40 Query(params): Query<ClientParams>,
41 TypedHeader(SdkAuthorization(sdk_key)): TypedHeader<SdkAuthorization>,
42 ) -> Result<Response, FPServerError>;
43
44 async fn client_sdk_events(
45 &self,
46 TypedHeader(SdkAuthorization(sdk_key)): TypedHeader<SdkAuthorization>,
47 ) -> Result<Response, FPServerError>;
48
49 async fn update_toggles(
50 &self,
51 Json(params): Json<ToggleUpdateParams>,
52 ) -> Result<Response, FPServerError>;
53
54 #[cfg(feature = "unstable")]
55 async fn update_segments(
56 &self,
57 Json(params): Json<SegmentUpdateParams>,
58 ) -> Result<Response, FPServerError>;
59
60 #[cfg(feature = "unstable")]
61 async fn check_secrets(
62 &self,
63 Json(_params): Json<SecretsParams>,
64 ) -> Result<Json<HashMap<String, String>>, FPServerError>;
65
66 async fn all_secrets(&self) -> Result<Json<HashMap<String, String>>, FPServerError>;
67}
68
69#[derive(Clone)]
70pub struct FpHttpHandler {
71 pub repo: Arc<SdkRepository>,
72 pub http_client: Arc<Client>,
73 pub events_url: Url,
74 pub analysis_url: Option<Url>,
75 pub events_timeout: Duration,
76}
77
78#[async_trait]
79impl HttpHandler for FpHttpHandler {
80 async fn server_sdk_toggles(
81 &self,
82 TypedHeader(SdkAuthorization(sdk_key)): TypedHeader<SdkAuthorization>,
83 ) -> Result<Response, FPServerError> {
84 match self.repo.server_sdk_repo_string(&sdk_key) {
85 Ok(body) => Ok((
86 StatusCode::OK,
87 [(header::CONTENT_TYPE, "application/json")],
88 body,
89 )
90 .into_response()),
91 Err(e) => match e {
92 NotReady(_) => Ok((
93 StatusCode::SERVICE_UNAVAILABLE,
94 [(header::CONTENT_TYPE, "application/json")],
95 "{}",
96 )
97 .into_response()),
98 NotFound(_) => Ok((
99 StatusCode::OK,
100 [(header::CONTENT_TYPE, "application/json")],
101 serde_json::to_string(&Repository::default()).unwrap(),
102 )
103 .into_response()),
104 _ => Err(e),
105 },
106 }
107 }
108
109 async fn client_sdk_toggles(
110 &self,
111 Query(params): Query<ClientParams>,
112 TypedHeader(SdkAuthorization(sdk_key)): TypedHeader<SdkAuthorization>,
113 ) -> Result<Response, FPServerError> {
114 let user = decode_user(params.user)?;
115 match self.repo.client_sdk_eval_string(&sdk_key, &user) {
116 Ok(body) => Ok((StatusCode::OK, cors_headers(), body).into_response()),
117 Err(e) => match e {
118 NotReady(_) => {
119 Ok((StatusCode::SERVICE_UNAVAILABLE, cors_headers(), "{}").into_response())
120 }
121 NotFound(_) => Ok((StatusCode::OK, cors_headers(), "{}").into_response()),
122 _ => Err(e),
123 },
124 }
125 }
126
127 async fn client_sdk_events(
128 &self,
129 TypedHeader(SdkAuthorization(sdk_key)): TypedHeader<SdkAuthorization>,
130 ) -> Result<Response, FPServerError> {
131 match self.repo.client_sdk_events_string(&sdk_key) {
132 Ok(body) => Ok((StatusCode::OK, cors_headers(), body).into_response()),
133 Err(e) => match e {
134 NotReady(_) => {
135 Ok((StatusCode::SERVICE_UNAVAILABLE, cors_headers(), "{}").into_response())
136 }
137 NotFound(_) => Ok((StatusCode::OK, cors_headers(), "{}").into_response()),
138 _ => Err(e),
139 },
140 }
141 }
142
143 async fn update_toggles(
144 &self,
145 Json(params): Json<ToggleUpdateParams>,
146 ) -> Result<Response, FPServerError> {
147 let sdk_key = params.sdk_key;
148 match self.repo.client_sync_now(&sdk_key, SyncType::Realtime) {
149 Ok(_sdk_key) => Ok((StatusCode::OK, cors_headers(), "{}").into_response()),
150 Err(e) => match e {
151 NotFound(_) => Ok((
152 StatusCode::BAD_REQUEST,
153 [(header::CONTENT_TYPE, "application/json")],
154 "{}",
155 )
156 .into_response()),
157 _ => Err(e),
158 },
159 }
160 }
161
162 #[cfg(feature = "unstable")]
163 async fn update_toggles(
164 &self,
165 Json(params): Json<ToggleUpdateParams>,
166 ) -> Result<Response, FPServerError> {
167 self.repo.update_toggles(¶ms.sdk_key, params.toggles)?;
168 let status = StatusCode::OK;
169 Ok(status.into_response())
170 }
171 #[cfg(feature = "unstable")]
172
173 async fn update_segments(
174 &self,
175 Json(params): Json<SegmentUpdateParams>,
176 ) -> Result<Response, FPServerError> {
177 self.repo.update_segments(params.segments)?;
178 let status = StatusCode::OK;
179 let body = "";
180 Ok((status, body).into_response())
181 }
182
183 #[cfg(feature = "unstable")]
184 async fn check_secrets(
185 &self,
186 Json(_params): Json<SecretsParams>,
187 ) -> Result<Json<HashMap<String, String>>, FPServerError> {
188 Ok(HashMap::new().into())
189 }
190
191 async fn all_secrets(&self) -> Result<Json<HashMap<String, String>>, FPServerError> {
192 let secret_keys = self.repo.secret_keys();
193 Ok(secret_keys.into())
194 }
195}
196
197#[async_trait]
198impl EventHandler for FpHttpHandler {
199 async fn handle_events(
200 &self,
201 sdk_key: String,
202 user_agent: String,
203 headers: HeaderMap,
204 data: VecDeque<Value>,
205 ) -> Result<Response, FPEventError> {
206 let http_client = self.http_client.clone();
207 let events_url = self.events_url.clone();
208 let analysis_url = self.analysis_url.clone();
209 let timeout = self.events_timeout;
210 let server_sdk_key = {
211 if sdk_key.starts_with("client") {
212 match self.repo.secret_keys().get(sdk_key.as_str()) {
213 Some(key) => key.clone(),
214 None => {
215 return Ok((StatusCode::UNAUTHORIZED, cors_headers(), "{}").into_response())
216 }
217 }
218 } else {
219 sdk_key.clone()
220 }
221 };
222 let headers = build_header(headers, server_sdk_key, &user_agent);
223 send_events(
224 headers,
225 events_url,
226 analysis_url,
227 timeout,
228 data,
229 http_client,
230 )
231 .await;
232 Ok((StatusCode::OK, cors_headers(), "{}").into_response())
233 }
234}
235
236async fn send_events(
237 headers: HeaderMap,
238 events_url: Url,
239 analysis_url: Option<Url>,
240 timeout: Duration,
241 data: VecDeque<Value>,
242 http_client: Arc<Client>,
243) {
244 if let Some(analysis_url) = analysis_url {
245 let headers = headers.clone();
246 let data = data.clone();
247 let http_client = http_client.clone();
248 tokio::spawn(async move {
249 do_send_events(headers, analysis_url, timeout, data, http_client).await;
250 });
251 }
252
253 tokio::spawn(async move {
254 do_send_events(headers, events_url, timeout, data, http_client).await;
255 });
256}
257
258async fn do_send_events(
259 headers: HeaderMap,
260 url: Url,
261 timeout: Duration,
262 data: VecDeque<Value>,
263 http_client: Arc<Client>,
264) {
265 let traffic_request = http_client
266 .request(Method::POST, url.clone())
267 .headers(headers.clone())
268 .timeout(timeout)
269 .json(&data);
270
271 debug!("traffic post data: {data:?}");
272 debug!("traffic post req: {traffic_request:?}");
273
274 match traffic_request.send().await {
275 Err(e) => info!("traffic post error: {}", e),
276 Ok(r) => info!("traffic post success: {:?}", r),
277 };
278}
279
280fn build_header(headers: HeaderMap, sdk_key: String, user_agent: &str) -> HeaderMap {
281 let ua = headers.get("ua");
282 let auth = SdkAuthorization(sdk_key).encode();
283 let mut headers = HeaderMap::with_capacity(3);
284 headers.append(AUTHORIZATION, auth);
285 if let Ok(v) = HeaderValue::from_str(user_agent) {
286 headers.append(USER_AGENT, v);
287 }
288
289 if let Some(ua) = ua {
290 headers.append("ua", ua.clone());
291 }
292 headers
293}
294
295fn decode_user(user: String) -> Result<FPUser, FPServerError> {
296 if let Ok(user) = base64::decode(user) {
297 if let Ok(user) = String::from_utf8(user) {
298 if let Ok(user) = serde_json::from_str::<FPUser>(&user) {
299 return Ok(user);
300 }
301 }
302 }
303 Err(FPServerError::UserDecodeError)
304}
305
306#[derive(Clone, Default)]
308pub struct LocalFileHttpHandlerForTest {
309 pub version_update: bool,
310 body: Arc<Mutex<Option<String>>>,
311}
312
313#[async_trait]
314impl HttpHandler for LocalFileHttpHandlerForTest {
315 async fn server_sdk_toggles(
316 &self,
317 TypedHeader(SdkAuthorization(_sdk_key)): TypedHeader<SdkAuthorization>,
318 ) -> Result<Response, FPServerError> {
319 let mut lock = self.body.lock();
320 let body = match &*lock {
321 None => {
322 let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
323 path.push("resources/fixtures/repo.json");
324 let body = fs::read_to_string(path).unwrap();
325 *lock = Some(body.clone());
326 body
327 }
328 Some(body) => body.clone(),
329 };
330
331 if self.version_update {
332 let mut repo: Repository = serde_json::from_str(&body).unwrap();
334 let version = repo.version.unwrap_or_default();
335 repo.version = Some(version + 1);
336 *lock = Some(serde_json::to_string(&repo).unwrap());
338 }
339
340 Ok((
341 StatusCode::OK,
342 [(header::CONTENT_TYPE, "application/json")],
343 body,
344 )
345 .into_response())
346 }
347
348 async fn client_sdk_toggles(
349 &self,
350 Query(_params): Query<ClientParams>,
351 TypedHeader(SdkAuthorization(_sdk_key)): TypedHeader<SdkAuthorization>,
352 ) -> Result<Response, FPServerError> {
353 let body = "{}".to_owned();
354 Ok((
355 StatusCode::OK,
356 [(header::CONTENT_TYPE, "application/json")],
357 body,
358 )
359 .into_response())
360 }
361
362 async fn client_sdk_events(
363 &self,
364 TypedHeader(SdkAuthorization(_sdk_key)): TypedHeader<SdkAuthorization>,
365 ) -> Result<Response, FPServerError> {
366 let body = "{}".to_owned();
367 Ok((
368 StatusCode::OK,
369 [(header::CONTENT_TYPE, "application/json")],
370 body,
371 )
372 .into_response())
373 }
374
375 async fn update_toggles(
376 &self,
377 Json(_params): Json<ToggleUpdateParams>,
378 ) -> Result<Response, FPServerError> {
379 let body = "{}".to_owned();
380 Ok((
381 StatusCode::OK,
382 [(header::CONTENT_TYPE, "application/json")],
383 body,
384 )
385 .into_response())
386 }
387
388 #[cfg(feature = "unstable")]
389 async fn update_segments(
390 &self,
391 Json(_params): Json<SegmentUpdateParams>,
392 ) -> Result<Response, FPServerError> {
393 let status = StatusCode::OK;
394 let body = "";
395 Ok((status, body).into_response())
396 }
397
398 #[cfg(feature = "unstable")]
399 async fn check_secrets(
400 &self,
401 Json(_params): Json<SecretsParams>,
402 ) -> Result<Json<HashMap<String, String>>, FPServerError> {
403 Ok(HashMap::new().into())
404 }
405
406 async fn all_secrets(&self) -> Result<Json<HashMap<String, String>>, FPServerError> {
407 let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
408 path.push("resources/fixtures/secrets.json");
409 let json_str = fs::read_to_string(path).unwrap();
410 let secret_keys =
411 serde_json::from_str::<HashMap<String, HashMap<String, String>>>(&json_str).unwrap();
412 let secret_keys = secret_keys.get("mapping").unwrap().to_owned();
413 Ok(secret_keys.into())
414 }
415}
416
417#[async_trait]
418impl EventHandler for LocalFileHttpHandlerForTest {
419 async fn handle_events(
420 &self,
421 _sdk_key: String,
422 _user_agent: String,
423 _headers: HeaderMap,
424 _data: VecDeque<Value>,
425 ) -> Result<Response, FPEventError> {
426 Ok((StatusCode::OK, cors_headers(), "").into_response())
427 }
428}