1use crate::billing::app_store_client::AppStoreClient;
2use crate::billing::billing_service::*;
3use crate::billing::google_play_client::GooglePlayClient;
4use crate::billing::stripe_client::StripeClient;
5use crate::config::Config;
6use crate::document_service::DocumentService;
7use crate::utils::get_build_info;
8use crate::{handle_version_header, router_service, verify_auth, ServerError, ServerState};
9use lazy_static::lazy_static;
10use lockbook_shared::api::*;
11use lockbook_shared::api::{ErrorWrapper, Request, RequestWrapper};
12use lockbook_shared::SharedErrorKind;
13use prometheus::{
14 register_counter_vec, register_histogram_vec, CounterVec, HistogramVec, TextEncoder,
15};
16use serde::de::DeserializeOwned;
17use serde::Serialize;
18use std::collections::HashMap;
19use std::sync::Arc;
20use tracing::*;
21use warp::http::{HeaderValue, Method, StatusCode};
22use warp::hyper::body::Bytes;
23use warp::{reject, Filter, Rejection};
24
25lazy_static! {
26 pub static ref HTTP_REQUEST_DURATION_HISTOGRAM: HistogramVec = register_histogram_vec!(
27 "lockbook_server_request_duration_seconds",
28 "Lockbook server's HTTP request duration in seconds",
29 &["request"]
30 )
31 .unwrap();
32 pub static ref CORE_VERSION_COUNTER: CounterVec = register_counter_vec!(
33 "lockbook_server_core_version",
34 "Core version request attempts",
35 &["request"]
36 )
37 .unwrap();
38}
39
40#[macro_export]
41macro_rules! core_req {
42 ($Req: ty, $handler: path, $state: ident) => {{
43 use lockbook_shared::api::{ErrorWrapper, Request};
44 use lockbook_shared::file_metadata::Owner;
45 use tracing::*;
46 use $crate::router_service::{self, deserialize_and_check, method};
47 use $crate::{RequestContext, ServerError};
48
49 let cloned_state = $state.clone();
50
51 method(<$Req>::METHOD)
52 .and(warp::path(&<$Req>::ROUTE[1..]))
53 .and(warp::any().map(move || cloned_state.clone()))
54 .and(warp::body::bytes())
55 .and(warp::header::optional::<String>("Accept-Version"))
56 .then(|state: Arc<ServerState<S, A, G, D>>, request: Bytes, version: Option<String>| {
57 let span1 = span!(
58 Level::INFO,
59 "matched_request",
60 method = &<$Req>::METHOD.as_str(),
61 route = &<$Req>::ROUTE,
62 );
63 async move {
64 let state = state.as_ref();
65 let timer = router_service::HTTP_REQUEST_DURATION_HISTOGRAM
66 .with_label_values(&[<$Req>::ROUTE])
67 .start_timer();
68
69 let request: RequestWrapper<$Req> =
70 match deserialize_and_check(&state.config, request, version) {
71 Ok(req) => req,
72 Err(err) => {
73 warn!("request failed to parse: {:?}", err);
74 return warp::reply::json::<Result<RequestWrapper<$Req>, _>>(&Err(
75 err,
76 ));
77 }
78 };
79
80 debug!("request verified successfully");
81 let req_pk = request.signed_request.public_key;
82 let username = match state.index_db.lock().map(|db| {
83 db.accounts
84 .get()
85 .get(&Owner(req_pk))
86 .map(|account| account.username.clone())
87 }) {
88 Ok(Some(username)) => username,
89 Ok(None) => "~unknown~".to_string(),
90 Err(error) => {
91 error!(?error, "dbrs error");
92 "~error~".to_string()
93 }
94 };
95 let req_pk = base64::encode(req_pk.serialize_compressed());
96
97 let span2 = span!(
98 Level::INFO,
99 "verified_request_signature",
100 username = username.as_str(),
101 public_key = req_pk.as_str()
102 );
103 let rc: RequestContext<$Req> = RequestContext {
104 request: request.signed_request.timestamped_value.value,
105 public_key: request.signed_request.public_key,
106 };
107 async move {
108 let to_serialize = match $handler(state, rc).await {
109 Ok(response) => {
110 info!("request processed successfully");
111 Ok(response)
112 }
113 Err(ServerError::ClientError(e)) => {
114 warn!("request rejected due to a client error: {:?}", e);
115 Err(ErrorWrapper::Endpoint(e))
116 }
117 Err(ServerError::InternalError(e)) => {
118 error!("Internal error {}: {}", <$Req>::ROUTE, e);
119 Err(ErrorWrapper::InternalError)
120 }
121 };
122 let response = warp::reply::json(&to_serialize);
123 timer.observe_duration();
124 response
125 }
126 .instrument(span2)
127 .await
128 }
129 .instrument(span1)
130 })
131 }};
132}
133
134pub fn core_routes<S, A, G, D>(
135 server_state: &Arc<ServerState<S, A, G, D>>,
136) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone
137where
138 S: StripeClient,
139 A: AppStoreClient,
140 G: GooglePlayClient,
141 D: DocumentService,
142{
143 core_req!(NewAccountRequest, ServerState::new_account, server_state)
144 .or(core_req!(ChangeDocRequest, ServerState::change_doc, server_state))
145 .or(core_req!(UpsertRequest, ServerState::upsert_file_metadata, server_state))
146 .or(core_req!(GetDocRequest, ServerState::get_document, server_state))
147 .or(core_req!(GetPublicKeyRequest, ServerState::get_public_key, server_state))
148 .or(core_req!(GetUsernameRequest, ServerState::get_username, server_state))
149 .or(core_req!(GetUsageRequest, ServerState::get_usage, server_state))
150 .or(core_req!(GetFileIdsRequest, ServerState::get_file_ids, server_state))
151 .or(core_req!(GetUpdatesRequest, ServerState::get_updates, server_state))
152 .or(core_req!(
153 UpgradeAccountGooglePlayRequest,
154 ServerState::upgrade_account_google_play,
155 server_state
156 ))
157 .or(core_req!(
158 UpgradeAccountStripeRequest,
159 ServerState::upgrade_account_stripe,
160 server_state
161 ))
162 .or(core_req!(
163 UpgradeAccountAppStoreRequest,
164 ServerState::upgrade_account_app_store,
165 server_state
166 ))
167 .or(core_req!(CancelSubscriptionRequest, ServerState::cancel_subscription, server_state))
168 .or(core_req!(GetSubscriptionInfoRequest, ServerState::get_subscription_info, server_state))
169 .or(core_req!(DeleteAccountRequest, ServerState::delete_account, server_state))
170 .or(core_req!(
171 AdminDisappearAccountRequest,
172 ServerState::admin_disappear_account,
173 server_state
174 ))
175 .or(core_req!(AdminDisappearFileRequest, ServerState::admin_disappear_file, server_state))
176 .or(core_req!(AdminListUsersRequest, ServerState::admin_list_users, server_state))
177 .or(core_req!(
178 AdminGetAccountInfoRequest,
179 ServerState::admin_get_account_info,
180 server_state
181 ))
182 .or(core_req!(
183 AdminValidateAccountRequest,
184 ServerState::admin_validate_account,
185 server_state
186 ))
187 .or(core_req!(AdminValidateServerRequest, ServerState::admin_validate_server, server_state))
188 .or(core_req!(AdminFileInfoRequest, ServerState::admin_file_info, server_state))
189 .or(core_req!(AdminRebuildIndexRequest, ServerState::admin_rebuild_index, server_state))
190 .or(core_req!(AdminSetUserTierRequest, ServerState::admin_set_user_tier, server_state))
191}
192
193pub fn build_info() -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
194 warp::get()
195 .and(warp::path(&GetBuildInfoRequest::ROUTE[1..]))
196 .map(|| {
197 let span = span!(
198 Level::INFO,
199 "matched_request",
200 method = &GetBuildInfoRequest::METHOD.as_str(),
201 route = &GetBuildInfoRequest::ROUTE,
202 );
203 let _enter = span.enter();
204 let timer = router_service::HTTP_REQUEST_DURATION_HISTOGRAM
205 .with_label_values(&[GetBuildInfoRequest::ROUTE])
206 .start_timer();
207 let resp = get_build_info();
208 info!("request processed successfully");
209 let resp = warp::reply::json(&resp);
210 timer.observe_duration();
211 resp
212 })
213}
214
215pub fn get_metrics() -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone
216{
217 warp::get().and(warp::path("metrics")).map(|| {
218 let span = span!(Level::INFO, "matched_request", method = "GET", route = "/metrics",);
219 let _enter = span.enter();
220 match TextEncoder::new().encode_to_string(prometheus::gather().as_slice()) {
221 Ok(metrics) => {
222 info!("request processed successfully");
223 metrics
224 }
225 Err(err) => {
226 error!("Error preparing response for prometheus: {:?}", err);
227 String::new()
228 }
229 }
230 })
231}
232
233static STRIPE_WEBHOOK_ROUTE: &str = "stripe-webhooks";
234
235pub fn stripe_webhooks<S, A, G, D>(
236 server_state: &Arc<ServerState<S, A, G, D>>,
237) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone
238where
239 S: StripeClient,
240 A: AppStoreClient,
241 G: GooglePlayClient,
242 D: DocumentService,
243{
244 let cloned_state = server_state.clone();
245
246 warp::post()
247 .and(warp::path(STRIPE_WEBHOOK_ROUTE))
248 .and(warp::any().map(move || cloned_state.clone()))
249 .and(warp::body::bytes())
250 .and(warp::header::header("Stripe-Signature"))
251 .then(
252 |state: Arc<ServerState<S, A, G, D>>, request: Bytes, stripe_sig: HeaderValue| async move {
253 let span = span!(
254 Level::INFO,
255 "matched_request",
256 method = "POST",
257 route = format!("/{}", STRIPE_WEBHOOK_ROUTE).as_str()
258 );
259 let _enter = span.enter();
260 info!("webhook routed");
261 let response = span
262 .in_scope(|| ServerState::stripe_webhooks(&state, request, stripe_sig))
263 .await;
264
265 match response {
266 Ok(_) => warp::reply::with_status("".to_string(), StatusCode::OK),
267 Err(e) => {
268 error!("{:?}", e);
269
270 let status_code = match e {
271 ServerError::ClientError(StripeWebhookError::VerificationError(_))
272 | ServerError::ClientError(StripeWebhookError::InvalidBody(_))
273 | ServerError::ClientError(StripeWebhookError::InvalidHeader(_))
274 | ServerError::ClientError(StripeWebhookError::ParseError(_)) => {
275 StatusCode::BAD_REQUEST
276 }
277 ServerError::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
278 };
279
280 warp::reply::with_status("".to_string(), status_code)
281 }
282 }
283 },
284 )
285}
286
287static PLAY_WEBHOOK_ROUTE: &str = "google_play_notification_webhook";
288
289pub fn google_play_notification_webhooks<S, A, G, D>(
290 server_state: &Arc<ServerState<S, A, G, D>>,
291) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone
292where
293 S: StripeClient,
294 A: AppStoreClient,
295 G: GooglePlayClient,
296 D: DocumentService,
297{
298 let cloned_state = server_state.clone();
299
300 warp::post()
301 .and(warp::path(PLAY_WEBHOOK_ROUTE))
302 .and(warp::any().map(move || cloned_state.clone()))
303 .and(warp::body::bytes())
304 .and(warp::query::query::<HashMap<String, String>>())
305 .then(
306 |state: Arc<ServerState<S, A, G, D>>,
307 request: Bytes,
308 query_parameters: HashMap<String, String>| async move {
309 let span = span!(
310 Level::INFO,
311 "matched_request",
312 method = "POST",
313 route = format!("/{}", PLAY_WEBHOOK_ROUTE).as_str()
314 );
315 let _enter = span.enter();
316 info!("webhook routed");
317 let response = span
318 .in_scope(|| state.google_play_notification_webhooks(request, query_parameters))
319 .await;
320 match response {
321 Ok(_) => warp::reply::with_status("".to_string(), StatusCode::OK),
322 Err(e) => {
323 error!("{:?}", e);
324
325 let status_code = match e {
326 ServerError::ClientError(GooglePlayWebhookError::InvalidToken)
327 | ServerError::ClientError(
328 GooglePlayWebhookError::CannotRetrieveData,
329 )
330 | ServerError::ClientError(
331 GooglePlayWebhookError::CannotDecodePubSubData(_),
332 ) => StatusCode::BAD_REQUEST,
333 ServerError::ClientError(
334 GooglePlayWebhookError::CannotRetrieveUserInfo,
335 )
336 | ServerError::ClientError(
337 GooglePlayWebhookError::CannotRetrievePublicKey,
338 )
339 | ServerError::ClientError(GooglePlayWebhookError::CannotParseTime)
340 | ServerError::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
341 };
342
343 warp::reply::with_status("".to_string(), status_code)
344 }
345 }
346 },
347 )
348}
349
350static APP_STORE_WEBHOOK_ROUTE: &str = "app_store_notification_webhook";
351pub fn app_store_notification_webhooks<S, A, G, D>(
352 server_state: &Arc<ServerState<S, A, G, D>>,
353) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone
354where
355 S: StripeClient,
356 A: AppStoreClient,
357 G: GooglePlayClient,
358 D: DocumentService,
359{
360 let cloned_state = server_state.clone();
361
362 warp::post()
363 .and(warp::path(APP_STORE_WEBHOOK_ROUTE))
364 .and(warp::any().map(move || cloned_state.clone()))
365 .and(warp::body::bytes())
366 .then(|state: Arc<ServerState<S, A, G, D>>, body: Bytes| async move {
367 let span = span!(
368 Level::INFO,
369 "matched_request",
370 method = "POST",
371 route = format!("/{}", APP_STORE_WEBHOOK_ROUTE).as_str()
372 );
373 let _enter = span.enter();
374 info!("webhook routed");
375 let response = span
376 .in_scope(|| state.app_store_notification_webhook(body))
377 .await;
378
379 match response {
380 Ok(_) => warp::reply::with_status("".to_string(), StatusCode::OK),
381 Err(e) => {
382 error!("{:?}", e);
383
384 let status_code = match e {
385 ServerError::ClientError(AppStoreNotificationError::InvalidJWS) => {
386 StatusCode::BAD_REQUEST
387 }
388 ServerError::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
389 };
390
391 warp::reply::with_status("".to_string(), status_code)
392 }
393 }
394 })
395}
396
397pub fn method(name: Method) -> impl Filter<Extract = (), Error = Rejection> + Clone {
398 warp::method()
399 .and(warp::any().map(move || name.clone()))
400 .and_then(|request: Method, intention: Method| async move {
401 if request == intention {
402 Ok(())
403 } else {
404 Err(reject::not_found())
405 }
406 })
407 .untuple_one()
408}
409
410pub fn deserialize_and_check<Req>(
411 config: &Config, request: Bytes, version: Option<String>,
412) -> Result<RequestWrapper<Req>, ErrorWrapper<Req::Error>>
413where
414 Req: Request + DeserializeOwned + Serialize,
415{
416 handle_version_header::<Req>(config, &version)?;
417
418 let request = serde_json::from_slice(request.as_ref()).map_err(|err| {
419 warn!("Request parsing failure: {}", err);
420 ErrorWrapper::<Req::Error>::BadRequest
421 })?;
422
423 verify_auth(config, &request).map_err(|err| match err.kind {
424 SharedErrorKind::SignatureExpired(_) | SharedErrorKind::SignatureInTheFuture(_) => {
425 warn!("expired auth");
426 ErrorWrapper::<Req::Error>::ExpiredAuth
427 }
428 _ => {
429 warn!("invalid auth");
430 ErrorWrapper::<Req::Error>::InvalidAuth
431 }
432 })?;
433
434 Ok(request)
435}