1use crate::billing::app_store_client::AppStoreClient;
2use crate::billing::billing_service::*;
3use crate::billing::google_play_client::GooglePlayClient;
4use crate::billing::stripe_client::StripeClient;
5use crate::config::Config;
6use crate::document_service::DocumentService;
7use crate::utils::get_build_info;
8use crate::{ServerError, ServerState, handle_version_header, router_service, verify_auth};
9use lazy_static::lazy_static;
10use lb_rs::model::api::{ErrorWrapper, Request, RequestWrapper, *};
11use lb_rs::model::errors::{LbErrKind, SignError};
12use prometheus::{
13 CounterVec, HistogramVec, TextEncoder, register_counter_vec, register_histogram_vec,
14};
15use serde::Serialize;
16use serde::de::DeserializeOwned;
17use std::collections::HashMap;
18use std::sync::Arc;
19use tracing::*;
20use warp::http::{HeaderValue, Method, StatusCode};
21use warp::hyper::body::Bytes;
22use warp::{Filter, Rejection, reject};
23
24lazy_static! {
25 pub static ref HTTP_REQUEST_DURATION_HISTOGRAM: HistogramVec = register_histogram_vec!(
26 "lockbook_server_request_duration_seconds",
27 "Lockbook server's HTTP request duration in seconds",
28 &["request"]
29 )
30 .unwrap();
31 pub static ref CORE_VERSION_COUNTER: CounterVec = register_counter_vec!(
32 "lockbook_server_core_version",
33 "Core version request attempts",
34 &["request"]
35 )
36 .unwrap();
37}
38
39#[macro_export]
40macro_rules! core_req {
41 ($Req: ty, $handler: path, $state: ident) => {{
42 use lb_rs::model::api::{ErrorWrapper, Request};
43 use lb_rs::model::file_metadata::Owner;
44 use std::net::SocketAddr;
45 use tracing::*;
46 use $crate::router_service::{self, deserialize_and_check, method};
47 use $crate::{RequestContext, ServerError};
48
49 let cloned_state = $state.clone();
50
51 method(<$Req>::METHOD)
52 .and(warp::path(&<$Req>::ROUTE[1..]))
53 .and(warp::any().map(move || cloned_state.clone()))
54 .and(warp::body::bytes())
55 .and(warp::header::optional::<String>("Accept-Version"))
56 .and(warp::filters::addr::remote())
57 .then(
58 |state: Arc<ServerState<S, A, G, D>>,
59 request: Bytes,
60 version: Option<String>,
61 ip: Option<SocketAddr>| {
62 if ip.is_none() {
63 tracing::error!("ip not present in request");
64 }
65 let span1 = span!(
66 Level::INFO,
67 "matched_request",
68 http.method = &<$Req>::METHOD.as_str(),
69 http.url = &<$Req>::ROUTE,
70 http.remote_ip = ip
71 .map(|ip| ip.to_string())
72 .unwrap_or_else(|| String::from("unsupported")),
73 core_version = version
74 .clone()
75 .unwrap_or_else(|| String::from("not-present")),
76 );
77
78 async move {
79 let state = state.as_ref();
80 let timer = router_service::HTTP_REQUEST_DURATION_HISTOGRAM
81 .with_label_values(&[<$Req>::ROUTE])
82 .start_timer();
83
84 let request: RequestWrapper<$Req> =
85 match deserialize_and_check(&state.config, request, version) {
86 Ok(req) => req,
87 Err(err) => {
88 warn!("request failed to parse: {:?}", err);
89 return warp::reply::with_status(
90 warp::reply::json::<Result<RequestWrapper<$Req>, _>>(&Err(
91 err,
92 )),
93 warp::http::StatusCode::BAD_REQUEST,
94 );
95 }
96 };
97
98 let req_pk = request.signed_request.public_key;
99 let username = {
100 let db = state.index_db.lock().await;
101 match db
102 .accounts
103 .get()
104 .get(&Owner(req_pk))
105 .map(|account| account.username.clone())
106 {
107 Some(username) => username,
108 None => "~unknown~".to_string(),
109 }
110 };
111 let req_pk = base64::encode(req_pk.serialize_compressed());
112
113 let span2 = span!(
114 Level::INFO,
115 "verified_request_signature",
116 username = username.as_str(),
117 public_key = req_pk.as_str()
118 );
119 if ip.is_none() {
120 tracing::error!("ip not present in request");
121 }
122 let rc: RequestContext<$Req> = RequestContext {
123 request: request.signed_request.timestamped_value.value,
124 public_key: request.signed_request.public_key,
125 ip,
126 };
127
128 async move {
129 let status;
130 let mut level = tracing::Level::INFO;
131 let to_serialize = match $handler(state, rc).await {
132 Ok(response) => {
133 status = warp::http::StatusCode::OK;
134 Ok(response)
135 }
136 Err(ServerError::ClientError(e)) => {
137 status = warp::http::StatusCode::BAD_REQUEST;
138 level = tracing::Level::WARN;
139 Err(ErrorWrapper::Endpoint(e))
140 }
141 Err(ServerError::InternalError(e)) => {
142 status = warp::http::StatusCode::INTERNAL_SERVER_ERROR;
143 level = tracing::Level::ERROR;
144 tracing::error!("internal: {e}");
145 Err(ErrorWrapper::InternalError)
146 }
147 };
148 let response =
149 warp::reply::with_status(warp::reply::json(&to_serialize), status);
150 let log = format!("{status} {} {username}", &<$Req>::ROUTE);
151 let latency = timer.stop_and_record();
152 match level {
153 tracing::Level::INFO => {
154 tracing::info!(
155 http.latency = latency,
156 http.status = status.as_u16(),
157 "{log}"
158 );
159 }
160 tracing::Level::WARN => {
161 tracing::warn!(
162 http.latency = latency,
163 http.status = status.as_u16(),
164 "{log}"
165 );
166 }
167 tracing::Level::ERROR => {
168 tracing::error!(
169 http.latency = latency,
170 http.status = status.as_u16(),
171 "{log}"
172 );
173 }
174 _ => {
175 tracing::debug!(
176 http.latency = latency,
177 http.status = status.as_u16(),
178 "{log}"
179 );
180 }
181 }
182
183 response
184 }
185 .instrument(span2)
186 .await
187 }
188 .instrument(span1)
189 },
190 )
191 }};
192}
193
194pub fn core_routes<S, A, G, D>(
195 server_state: &Arc<ServerState<S, A, G, D>>,
196) -> impl Filter<Extract = (impl warp::Reply + use<S, A, G, D>,), Error = Rejection>
197+ Clone
198+ use<S, A, G, D>
199where
200 S: StripeClient,
201 A: AppStoreClient,
202 G: GooglePlayClient,
203 D: DocumentService,
204{
205 core_req!(NewAccountRequest, ServerState::new_account, server_state)
206 .or(core_req!(NewAccountRequestV2, ServerState::new_account_v2, server_state))
207 .or(core_req!(ChangeDocRequest, ServerState::change_doc, server_state))
208 .or(core_req!(ChangeDocRequestV2, ServerState::change_doc_v2, server_state))
209 .or(core_req!(UpsertRequest, ServerState::upsert_file_metadata, server_state))
210 .or(core_req!(UpsertRequestV2, ServerState::upsert_file_metadata_v2, server_state))
211 .or(core_req!(GetDocRequest, ServerState::get_document, server_state))
212 .or(core_req!(GetPublicKeyRequest, ServerState::get_public_key, server_state))
213 .or(core_req!(GetUsernameRequest, ServerState::get_username, server_state))
214 .or(core_req!(GetUsageRequest, ServerState::get_usage, server_state))
215 .or(core_req!(GetFileIdsRequest, ServerState::get_file_ids, server_state))
216 .or(core_req!(GetUpdatesRequest, ServerState::get_updates, server_state))
217 .or(core_req!(GetUpdatesRequestV2, ServerState::get_updates_v2, server_state))
218 .or(core_req!(
219 UpgradeAccountGooglePlayRequest,
220 ServerState::upgrade_account_google_play,
221 server_state
222 ))
223 .or(core_req!(
224 UpgradeAccountStripeRequest,
225 ServerState::upgrade_account_stripe,
226 server_state
227 ))
228 .or(core_req!(
229 UpgradeAccountAppStoreRequest,
230 ServerState::upgrade_account_app_store,
231 server_state
232 ))
233 .or(core_req!(CancelSubscriptionRequest, ServerState::cancel_subscription, server_state))
234 .or(core_req!(GetSubscriptionInfoRequest, ServerState::get_subscription_info, server_state))
235 .or(core_req!(DeleteAccountRequest, ServerState::delete_account, server_state))
236 .or(core_req!(UpsertDebugInfoRequest, ServerState::upsert_debug_info, server_state))
237 .or(core_req!(
238 AdminDisappearAccountRequest,
239 ServerState::admin_disappear_account,
240 server_state
241 ))
242 .or(core_req!(AdminDisappearFileRequest, ServerState::admin_disappear_file, server_state))
243 .or(core_req!(AdminListUsersRequest, ServerState::admin_list_users, server_state))
244 .or(core_req!(
245 AdminGetAccountInfoRequest,
246 ServerState::admin_get_account_info,
247 server_state
248 ))
249 .or(core_req!(
250 AdminValidateAccountRequest,
251 ServerState::admin_validate_account,
252 server_state
253 ))
254 .or(core_req!(AdminValidateServerRequest, ServerState::admin_validate_server, server_state))
255 .or(core_req!(AdminFileInfoRequest, ServerState::admin_file_info, server_state))
256 .or(core_req!(AdminRebuildIndexRequest, ServerState::admin_rebuild_index, server_state))
257 .or(core_req!(AdminSetUserTierRequest, ServerState::admin_set_user_tier, server_state))
258}
259
260pub fn build_info() -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
261 warp::get()
262 .and(warp::path(&GetBuildInfoRequest::ROUTE[1..]))
263 .map(|| {
264 let span = span!(
265 Level::INFO,
266 "matched_request",
267 method = &GetBuildInfoRequest::METHOD.as_str(),
268 route = &GetBuildInfoRequest::ROUTE,
269 );
270 let _enter = span.enter();
271 let timer = router_service::HTTP_REQUEST_DURATION_HISTOGRAM
272 .with_label_values(&[GetBuildInfoRequest::ROUTE])
273 .start_timer();
274 let resp = get_build_info();
275 info!("request processed successfully");
276 let resp = warp::reply::json(&resp);
277 timer.observe_duration();
278 resp
279 })
280}
281
282pub fn get_metrics() -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone
283{
284 warp::get().and(warp::path("metrics")).map(|| {
285 let span = span!(Level::INFO, "matched_request", method = "GET", route = "/metrics",);
286 let _enter = span.enter();
287 match TextEncoder::new().encode_to_string(prometheus::gather().as_slice()) {
288 Ok(metrics) => {
289 info!("request processed successfully");
290 metrics
291 }
292 Err(err) => {
293 error!("Error preparing response for prometheus: {:?}", err);
294 String::new()
295 }
296 }
297 })
298}
299
300static STRIPE_WEBHOOK_ROUTE: &str = "stripe-webhooks";
301
302pub fn stripe_webhooks<S, A, G, D>(
303 server_state: &Arc<ServerState<S, A, G, D>>,
304) -> impl Filter<Extract = (impl warp::Reply + use<S, A, G, D>,), Error = warp::Rejection>
305+ Clone
306+ use<S, A, G, D>
307where
308 S: StripeClient,
309 A: AppStoreClient,
310 G: GooglePlayClient,
311 D: DocumentService,
312{
313 let cloned_state = server_state.clone();
314
315 warp::post()
316 .and(warp::path(STRIPE_WEBHOOK_ROUTE))
317 .and(warp::any().map(move || cloned_state.clone()))
318 .and(warp::body::bytes())
319 .and(warp::header::header("Stripe-Signature"))
320 .then(
321 |state: Arc<ServerState<S, A, G, D>>, request: Bytes, stripe_sig: HeaderValue| async move {
322 let span = span!(
323 Level::INFO,
324 "matched_request",
325 method = "POST",
326 route = format!("/{STRIPE_WEBHOOK_ROUTE}").as_str()
327 );
328 let _enter = span.enter();
329 info!("webhook routed");
330 let response = span
331 .in_scope(|| ServerState::stripe_webhooks(&state, request, stripe_sig))
332 .await;
333
334 match response {
335 Ok(_) => warp::reply::with_status("".to_string(), StatusCode::OK),
336 Err(e) => {
337 error!("{:?}", e);
338
339 let status_code = match e {
340 ServerError::ClientError(StripeWebhookError::VerificationError(_))
341 | ServerError::ClientError(StripeWebhookError::InvalidBody(_))
342 | ServerError::ClientError(StripeWebhookError::InvalidHeader(_))
343 | ServerError::ClientError(StripeWebhookError::ParseError(_)) => {
344 StatusCode::BAD_REQUEST
345 }
346 ServerError::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
347 };
348
349 warp::reply::with_status("".to_string(), status_code)
350 }
351 }
352 },
353 )
354}
355
356static PLAY_WEBHOOK_ROUTE: &str = "google_play_notification_webhook";
357
358pub fn google_play_notification_webhooks<S, A, G, D>(
359 server_state: &Arc<ServerState<S, A, G, D>>,
360) -> impl Filter<Extract = (impl warp::Reply + use<S, A, G, D>,), Error = warp::Rejection>
361+ Clone
362+ use<S, A, G, D>
363where
364 S: StripeClient,
365 A: AppStoreClient,
366 G: GooglePlayClient,
367 D: DocumentService,
368{
369 let cloned_state = server_state.clone();
370
371 warp::post()
372 .and(warp::path(PLAY_WEBHOOK_ROUTE))
373 .and(warp::any().map(move || cloned_state.clone()))
374 .and(warp::body::bytes())
375 .and(warp::query::query::<HashMap<String, String>>())
376 .then(
377 |state: Arc<ServerState<S, A, G, D>>,
378 request: Bytes,
379 query_parameters: HashMap<String, String>| async move {
380 let span = span!(
381 Level::INFO,
382 "matched_request",
383 method = "POST",
384 route = format!("/{PLAY_WEBHOOK_ROUTE}").as_str()
385 );
386 let _enter = span.enter();
387 info!("webhook routed");
388 let response = span
389 .in_scope(|| state.google_play_notification_webhooks(request, query_parameters))
390 .await;
391 match response {
392 Ok(_) => warp::reply::with_status("".to_string(), StatusCode::OK),
393 Err(e) => {
394 error!("{:?}", e);
395
396 let status_code = match e {
397 ServerError::ClientError(GooglePlayWebhookError::InvalidToken)
398 | ServerError::ClientError(
399 GooglePlayWebhookError::CannotRetrieveData,
400 )
401 | ServerError::ClientError(
402 GooglePlayWebhookError::CannotDecodePubSubData(_),
403 ) => StatusCode::BAD_REQUEST,
404 ServerError::ClientError(
405 GooglePlayWebhookError::CannotRetrieveUserInfo,
406 )
407 | ServerError::ClientError(
408 GooglePlayWebhookError::CannotRetrievePublicKey,
409 )
410 | ServerError::ClientError(GooglePlayWebhookError::CannotParseTime)
411 | ServerError::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
412 };
413
414 warp::reply::with_status("".to_string(), status_code)
415 }
416 }
417 },
418 )
419}
420
421static APP_STORE_WEBHOOK_ROUTE: &str = "app_store_notification_webhook";
422pub fn app_store_notification_webhooks<S, A, G, D>(
423 server_state: &Arc<ServerState<S, A, G, D>>,
424) -> impl Filter<Extract = (impl warp::Reply + use<S, A, G, D>,), Error = warp::Rejection>
425+ Clone
426+ use<S, A, G, D>
427where
428 S: StripeClient,
429 A: AppStoreClient,
430 G: GooglePlayClient,
431 D: DocumentService,
432{
433 let cloned_state = server_state.clone();
434
435 warp::post()
436 .and(warp::path(APP_STORE_WEBHOOK_ROUTE))
437 .and(warp::any().map(move || cloned_state.clone()))
438 .and(warp::body::bytes())
439 .then(|state: Arc<ServerState<S, A, G, D>>, body: Bytes| async move {
440 let span = span!(
441 Level::INFO,
442 "matched_request",
443 method = "POST",
444 route = format!("/{APP_STORE_WEBHOOK_ROUTE}").as_str()
445 );
446 let _enter = span.enter();
447 info!("webhook routed");
448 let response = span
449 .in_scope(|| state.app_store_notification_webhook(body))
450 .await;
451
452 match response {
453 Ok(_) => warp::reply::with_status("".to_string(), StatusCode::OK),
454 Err(e) => {
455 error!("{:?}", e);
456
457 let status_code = match e {
458 ServerError::ClientError(AppStoreNotificationError::InvalidJWS) => {
459 StatusCode::BAD_REQUEST
460 }
461 ServerError::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
462 };
463
464 warp::reply::with_status("".to_string(), status_code)
465 }
466 }
467 })
468}
469
470pub fn method(name: Method) -> impl Filter<Extract = (), Error = Rejection> + Clone {
471 warp::method()
472 .and(warp::any().map(move || name.clone()))
473 .and_then(|request: Method, intention: Method| async move {
474 if request == intention { Ok(()) } else { Err(reject::not_found()) }
475 })
476 .untuple_one()
477}
478
479pub fn deserialize_and_check<Req>(
480 config: &Config, request: Bytes, version: Option<String>,
481) -> Result<RequestWrapper<Req>, ErrorWrapper<Req::Error>>
482where
483 Req: Request + DeserializeOwned + Serialize,
484{
485 handle_version_header::<Req>(config, &version)?;
486
487 let request = serde_json::from_slice(request.as_ref()).map_err(|err| {
488 warn!("Request parsing failure: {}", err);
489 ErrorWrapper::<Req::Error>::BadRequest
490 })?;
491
492 verify_auth(config, &request).map_err(|err| match err.kind {
493 LbErrKind::Sign(SignError::SignatureExpired(_))
494 | LbErrKind::Sign(SignError::SignatureInTheFuture(_)) => {
495 warn!("expired auth");
496 ErrorWrapper::<Req::Error>::ExpiredAuth
497 }
498 _ => {
499 warn!("invalid auth");
500 ErrorWrapper::<Req::Error>::InvalidAuth
501 }
502 })?;
503
504 Ok(request)
505}