1use crate::billing::app_store_client::AppStoreClient;
2use crate::billing::billing_service::*;
3use crate::billing::google_play_client::GooglePlayClient;
4use crate::billing::stripe_client::StripeClient;
5use crate::config::Config;
6use crate::document_service::DocumentService;
7use crate::utils::get_build_info;
8use crate::{ServerError, ServerState, handle_version_header, router_service, verify_auth};
9use lazy_static::lazy_static;
10use lb_rs::model::api::{ErrorWrapper, Request, RequestWrapper, *};
11use lb_rs::model::errors::{LbErrKind, SignError};
12use prometheus::{
13 CounterVec, HistogramVec, TextEncoder, register_counter_vec, register_histogram_vec,
14};
15use serde::Serialize;
16use serde::de::DeserializeOwned;
17use std::collections::HashMap;
18use std::sync::Arc;
19use tracing::*;
20use warp::http::{HeaderValue, Method, StatusCode};
21use warp::hyper::body::Bytes;
22use warp::{Filter, Rejection, reject};
23
24lazy_static! {
25 pub static ref HTTP_REQUEST_DURATION_HISTOGRAM: HistogramVec = register_histogram_vec!(
26 "lockbook_server_request_duration_seconds",
27 "Lockbook server's HTTP request duration in seconds",
28 &["request"]
29 )
30 .unwrap();
31 pub static ref CORE_VERSION_COUNTER: CounterVec = register_counter_vec!(
32 "lockbook_server_core_version",
33 "Core version request attempts",
34 &["request"]
35 )
36 .unwrap();
37}
38
39#[macro_export]
40macro_rules! core_req {
41 ($Req: ty, $handler: path, $state: ident) => {{
42 use lb_rs::model::api::{ErrorWrapper, Request};
43 use lb_rs::model::file_metadata::Owner;
44 use std::net::SocketAddr;
45 use tracing::*;
46 use $crate::router_service::{self, deserialize_and_check, method};
47 use $crate::{RequestContext, ServerError};
48
49 let cloned_state = $state.clone();
50
51 method(<$Req>::METHOD)
52 .and(warp::path(&<$Req>::ROUTE[1..]))
53 .and(warp::any().map(move || cloned_state.clone()))
54 .and(warp::body::bytes())
55 .and(warp::header::optional::<String>("Accept-Version"))
56 .and(warp::filters::addr::remote())
57 .then(
58 |state: Arc<ServerState<S, A, G, D>>,
59 request: Bytes,
60 version: Option<String>,
61 ip: Option<SocketAddr>| {
62 if ip.is_none() {
63 tracing::error!("ip not present in request");
64 }
65 let span1 = span!(
66 Level::INFO,
67 "matched_request",
68 http.method = &<$Req>::METHOD.as_str(),
69 http.url = &<$Req>::ROUTE,
70 http.remote_ip = ip
71 .map(|ip| ip.to_string())
72 .unwrap_or_else(|| String::from("unsupported")),
73 core_version = version
74 .clone()
75 .unwrap_or_else(|| String::from("not-present")),
76 );
77
78 async move {
79 let state = state.as_ref();
80 let timer = router_service::HTTP_REQUEST_DURATION_HISTOGRAM
81 .with_label_values(&[<$Req>::ROUTE])
82 .start_timer();
83
84 let request: RequestWrapper<$Req> =
85 match deserialize_and_check(&state.config, request, version) {
86 Ok(req) => req,
87 Err(err) => {
88 warn!("request failed to parse: {:?}", err);
89 return warp::reply::with_status(
90 warp::reply::json::<Result<RequestWrapper<$Req>, _>>(&Err(
91 err,
92 )),
93 warp::http::StatusCode::BAD_REQUEST,
94 );
95 }
96 };
97
98 let req_pk = request.signed_request.public_key;
99 let username = {
100 let db = state.index_db.lock().await;
101 match db
102 .accounts
103 .get()
104 .get(&Owner(req_pk))
105 .map(|account| account.username.clone())
106 {
107 Some(username) => username,
108 None => "~unknown~".to_string(),
109 }
110 };
111 let req_pk = base64::encode(req_pk.serialize_compressed());
112
113 let span2 = span!(
114 Level::INFO,
115 "verified_request_signature",
116 username = username.as_str(),
117 public_key = req_pk.as_str()
118 );
119 if ip.is_none() {
120 tracing::error!("ip not present in request");
121 }
122 let rc: RequestContext<$Req> = RequestContext {
123 request: request.signed_request.timestamped_value.value,
124 public_key: request.signed_request.public_key,
125 ip,
126 };
127
128 async move {
129 let status;
130 let mut level = tracing::Level::INFO;
131 let to_serialize = match $handler(state, rc).await {
132 Ok(response) => {
133 status = warp::http::StatusCode::OK;
134 Ok(response)
135 }
136 Err(ServerError::ClientError(e)) => {
137 status = warp::http::StatusCode::BAD_REQUEST;
138 level = tracing::Level::WARN;
139 Err(ErrorWrapper::Endpoint(e))
140 }
141 Err(ServerError::InternalError(e)) => {
142 status = warp::http::StatusCode::INTERNAL_SERVER_ERROR;
143 level = tracing::Level::ERROR;
144 tracing::error!("internal: {e}");
145 Err(ErrorWrapper::InternalError)
146 }
147 };
148 let response =
149 warp::reply::with_status(warp::reply::json(&to_serialize), status);
150 let log = format!("{status} {} {username}", &<$Req>::ROUTE);
151 let latency = timer.stop_and_record();
152 match level {
153 tracing::Level::INFO => {
154 tracing::info!(
155 http.latency = latency,
156 http.status = status.as_u16(),
157 "{log}"
158 );
159 }
160 tracing::Level::WARN => {
161 tracing::warn!(
162 http.latency = latency,
163 http.status = status.as_u16(),
164 "{log}"
165 );
166 }
167 tracing::Level::ERROR => {
168 tracing::error!(
169 http.latency = latency,
170 http.status = status.as_u16(),
171 "{log}"
172 );
173 }
174 _ => {
175 tracing::debug!(
176 http.latency = latency,
177 http.status = status.as_u16(),
178 "{log}"
179 );
180 }
181 }
182
183 response
184 }
185 .instrument(span2)
186 .await
187 }
188 .instrument(span1)
189 },
190 )
191 }};
192}
193
194pub fn core_routes<S, A, G, D>(
195 server_state: &Arc<ServerState<S, A, G, D>>,
196) -> impl Filter<Extract = (impl warp::Reply + use<S, A, G, D>,), Error = Rejection>
197+ Clone
198+ use<S, A, G, D>
199where
200 S: StripeClient,
201 A: AppStoreClient,
202 G: GooglePlayClient,
203 D: DocumentService,
204{
205 core_req!(NewAccountRequestV2, ServerState::new_account_v2, server_state)
206 .or(core_req!(ChangeDocRequestV2, ServerState::change_doc_v2, server_state))
207 .or(core_req!(UpsertRequestV2, ServerState::upsert_file_metadata_v2, server_state))
208 .or(core_req!(GetDocRequest, ServerState::get_document, server_state))
209 .or(core_req!(GetPublicKeyRequest, ServerState::get_public_key, server_state))
210 .or(core_req!(GetUsernameRequest, ServerState::get_username, server_state))
211 .or(core_req!(GetUsageRequest, ServerState::get_usage, server_state))
212 .or(core_req!(GetFileIdsRequest, ServerState::get_file_ids, server_state))
213 .or(core_req!(GetUpdatesRequestV2, ServerState::get_updates_v2, server_state))
214 .or(core_req!(
215 UpgradeAccountGooglePlayRequest,
216 ServerState::upgrade_account_google_play,
217 server_state
218 ))
219 .or(core_req!(
220 UpgradeAccountStripeRequest,
221 ServerState::upgrade_account_stripe,
222 server_state
223 ))
224 .or(core_req!(
225 UpgradeAccountAppStoreRequest,
226 ServerState::upgrade_account_app_store,
227 server_state
228 ))
229 .or(core_req!(CancelSubscriptionRequest, ServerState::cancel_subscription, server_state))
230 .or(core_req!(GetSubscriptionInfoRequest, ServerState::get_subscription_info, server_state))
231 .or(core_req!(DeleteAccountRequest, ServerState::delete_account, server_state))
232 .or(core_req!(UpsertDebugInfoRequest, ServerState::upsert_debug_info, server_state))
233 .or(core_req!(
234 AdminDisappearAccountRequest,
235 ServerState::admin_disappear_account,
236 server_state
237 ))
238 .or(core_req!(AdminDisappearFileRequest, ServerState::admin_disappear_file, server_state))
239 .or(core_req!(AdminListUsersRequest, ServerState::admin_list_users, server_state))
240 .or(core_req!(
241 AdminGetAccountInfoRequest,
242 ServerState::admin_get_account_info,
243 server_state
244 ))
245 .or(core_req!(
246 AdminValidateAccountRequest,
247 ServerState::admin_validate_account,
248 server_state
249 ))
250 .or(core_req!(AdminValidateServerRequest, ServerState::admin_validate_server, server_state))
251 .or(core_req!(AdminFileInfoRequest, ServerState::admin_file_info, server_state))
252 .or(core_req!(AdminRebuildIndexRequest, ServerState::admin_rebuild_index, server_state))
253 .or(core_req!(AdminSetUserTierRequest, ServerState::admin_set_user_tier, server_state))
254}
255
256pub fn build_info() -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
257 warp::get()
258 .and(warp::path(&GetBuildInfoRequest::ROUTE[1..]))
259 .map(|| {
260 let span = span!(
261 Level::INFO,
262 "matched_request",
263 method = &GetBuildInfoRequest::METHOD.as_str(),
264 route = &GetBuildInfoRequest::ROUTE,
265 );
266 let _enter = span.enter();
267 let timer = router_service::HTTP_REQUEST_DURATION_HISTOGRAM
268 .with_label_values(&[GetBuildInfoRequest::ROUTE])
269 .start_timer();
270 let resp = get_build_info();
271 info!("request processed successfully");
272 let resp = warp::reply::json(&resp);
273 timer.observe_duration();
274 resp
275 })
276}
277
278pub fn get_metrics() -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone
279{
280 warp::get().and(warp::path("metrics")).map(|| {
281 let span = span!(Level::INFO, "matched_request", method = "GET", route = "/metrics",);
282 let _enter = span.enter();
283 match TextEncoder::new().encode_to_string(prometheus::gather().as_slice()) {
284 Ok(metrics) => {
285 info!("request processed successfully");
286 metrics
287 }
288 Err(err) => {
289 error!("Error preparing response for prometheus: {:?}", err);
290 String::new()
291 }
292 }
293 })
294}
295
296static STRIPE_WEBHOOK_ROUTE: &str = "stripe-webhooks";
297
298pub fn stripe_webhooks<S, A, G, D>(
299 server_state: &Arc<ServerState<S, A, G, D>>,
300) -> impl Filter<Extract = (impl warp::Reply + use<S, A, G, D>,), Error = warp::Rejection>
301+ Clone
302+ use<S, A, G, D>
303where
304 S: StripeClient,
305 A: AppStoreClient,
306 G: GooglePlayClient,
307 D: DocumentService,
308{
309 let cloned_state = server_state.clone();
310
311 warp::post()
312 .and(warp::path(STRIPE_WEBHOOK_ROUTE))
313 .and(warp::any().map(move || cloned_state.clone()))
314 .and(warp::body::bytes())
315 .and(warp::header::header("Stripe-Signature"))
316 .then(
317 |state: Arc<ServerState<S, A, G, D>>, request: Bytes, stripe_sig: HeaderValue| async move {
318 let span = span!(
319 Level::INFO,
320 "matched_request",
321 method = "POST",
322 route = format!("/{STRIPE_WEBHOOK_ROUTE}").as_str()
323 );
324 let _enter = span.enter();
325 info!("webhook routed");
326 let response = span
327 .in_scope(|| ServerState::stripe_webhooks(&state, request, stripe_sig))
328 .await;
329
330 match response {
331 Ok(_) => warp::reply::with_status("".to_string(), StatusCode::OK),
332 Err(e) => {
333 error!("{:?}", e);
334
335 let status_code = match e {
336 ServerError::ClientError(StripeWebhookError::VerificationError(_))
337 | ServerError::ClientError(StripeWebhookError::InvalidBody(_))
338 | ServerError::ClientError(StripeWebhookError::InvalidHeader(_))
339 | ServerError::ClientError(StripeWebhookError::ParseError(_)) => {
340 StatusCode::BAD_REQUEST
341 }
342 ServerError::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
343 };
344
345 warp::reply::with_status("".to_string(), status_code)
346 }
347 }
348 },
349 )
350}
351
352static PLAY_WEBHOOK_ROUTE: &str = "google_play_notification_webhook";
353
354pub fn google_play_notification_webhooks<S, A, G, D>(
355 server_state: &Arc<ServerState<S, A, G, D>>,
356) -> impl Filter<Extract = (impl warp::Reply + use<S, A, G, D>,), Error = warp::Rejection>
357+ Clone
358+ use<S, A, G, D>
359where
360 S: StripeClient,
361 A: AppStoreClient,
362 G: GooglePlayClient,
363 D: DocumentService,
364{
365 let cloned_state = server_state.clone();
366
367 warp::post()
368 .and(warp::path(PLAY_WEBHOOK_ROUTE))
369 .and(warp::any().map(move || cloned_state.clone()))
370 .and(warp::body::bytes())
371 .and(warp::query::query::<HashMap<String, String>>())
372 .then(
373 |state: Arc<ServerState<S, A, G, D>>,
374 request: Bytes,
375 query_parameters: HashMap<String, String>| async move {
376 let span = span!(
377 Level::INFO,
378 "matched_request",
379 method = "POST",
380 route = format!("/{PLAY_WEBHOOK_ROUTE}").as_str()
381 );
382 let _enter = span.enter();
383 info!("webhook routed");
384 let response = span
385 .in_scope(|| state.google_play_notification_webhooks(request, query_parameters))
386 .await;
387 match response {
388 Ok(_) => warp::reply::with_status("".to_string(), StatusCode::OK),
389 Err(e) => {
390 error!("{:?}", e);
391
392 let status_code = match e {
393 ServerError::ClientError(GooglePlayWebhookError::InvalidToken)
394 | ServerError::ClientError(
395 GooglePlayWebhookError::CannotRetrieveData,
396 )
397 | ServerError::ClientError(
398 GooglePlayWebhookError::CannotDecodePubSubData(_),
399 ) => StatusCode::BAD_REQUEST,
400 ServerError::ClientError(
401 GooglePlayWebhookError::CannotRetrieveUserInfo,
402 )
403 | ServerError::ClientError(
404 GooglePlayWebhookError::CannotRetrievePublicKey,
405 )
406 | ServerError::ClientError(GooglePlayWebhookError::CannotParseTime)
407 | ServerError::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
408 };
409
410 warp::reply::with_status("".to_string(), status_code)
411 }
412 }
413 },
414 )
415}
416
417static APP_STORE_WEBHOOK_ROUTE: &str = "app_store_notification_webhook";
418pub fn app_store_notification_webhooks<S, A, G, D>(
419 server_state: &Arc<ServerState<S, A, G, D>>,
420) -> impl Filter<Extract = (impl warp::Reply + use<S, A, G, D>,), Error = warp::Rejection>
421+ Clone
422+ use<S, A, G, D>
423where
424 S: StripeClient,
425 A: AppStoreClient,
426 G: GooglePlayClient,
427 D: DocumentService,
428{
429 let cloned_state = server_state.clone();
430
431 warp::post()
432 .and(warp::path(APP_STORE_WEBHOOK_ROUTE))
433 .and(warp::any().map(move || cloned_state.clone()))
434 .and(warp::body::bytes())
435 .then(|state: Arc<ServerState<S, A, G, D>>, body: Bytes| async move {
436 let span = span!(
437 Level::INFO,
438 "matched_request",
439 method = "POST",
440 route = format!("/{APP_STORE_WEBHOOK_ROUTE}").as_str()
441 );
442 let _enter = span.enter();
443 info!("webhook routed");
444 let response = span
445 .in_scope(|| state.app_store_notification_webhook(body))
446 .await;
447
448 match response {
449 Ok(_) => warp::reply::with_status("".to_string(), StatusCode::OK),
450 Err(e) => {
451 error!("{:?}", e);
452
453 let status_code = match e {
454 ServerError::ClientError(AppStoreNotificationError::InvalidJWS) => {
455 StatusCode::BAD_REQUEST
456 }
457 ServerError::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
458 };
459
460 warp::reply::with_status("".to_string(), status_code)
461 }
462 }
463 })
464}
465
466pub fn method(name: Method) -> impl Filter<Extract = (), Error = Rejection> + Clone {
467 warp::method()
468 .and(warp::any().map(move || name.clone()))
469 .and_then(|request: Method, intention: Method| async move {
470 if request == intention { Ok(()) } else { Err(reject::not_found()) }
471 })
472 .untuple_one()
473}
474
475pub fn deserialize_and_check<Req>(
476 config: &Config, request: Bytes, version: Option<String>,
477) -> Result<RequestWrapper<Req>, ErrorWrapper<Req::Error>>
478where
479 Req: Request + DeserializeOwned + Serialize,
480{
481 handle_version_header::<Req>(config, &version)?;
482
483 let request = serde_json::from_slice(request.as_ref()).map_err(|err| {
484 warn!("Request parsing failure: {}", err);
485 ErrorWrapper::<Req::Error>::BadRequest
486 })?;
487
488 verify_auth(config, &request).map_err(|err| match err.kind {
489 LbErrKind::Sign(SignError::SignatureExpired(_))
490 | LbErrKind::Sign(SignError::SignatureInTheFuture(_)) => {
491 warn!("expired auth");
492 ErrorWrapper::<Req::Error>::ExpiredAuth
493 }
494 _ => {
495 warn!("invalid auth");
496 ErrorWrapper::<Req::Error>::InvalidAuth
497 }
498 })?;
499
500 Ok(request)
501}