lockbook_server_lib/
router_service.rs

1use crate::billing::app_store_client::AppStoreClient;
2use crate::billing::billing_service::*;
3use crate::billing::google_play_client::GooglePlayClient;
4use crate::billing::stripe_client::StripeClient;
5use crate::config::Config;
6use crate::document_service::DocumentService;
7use crate::utils::get_build_info;
8use crate::{handle_version_header, router_service, verify_auth, ServerError, ServerState};
9use lazy_static::lazy_static;
10use lockbook_shared::api::*;
11use lockbook_shared::api::{ErrorWrapper, Request, RequestWrapper};
12use lockbook_shared::SharedErrorKind;
13use prometheus::{
14    register_counter_vec, register_histogram_vec, CounterVec, HistogramVec, TextEncoder,
15};
16use serde::de::DeserializeOwned;
17use serde::Serialize;
18use std::collections::HashMap;
19use std::sync::Arc;
20use tracing::*;
21use warp::http::{HeaderValue, Method, StatusCode};
22use warp::hyper::body::Bytes;
23use warp::{reject, Filter, Rejection};
24
25lazy_static! {
26    pub static ref HTTP_REQUEST_DURATION_HISTOGRAM: HistogramVec = register_histogram_vec!(
27        "lockbook_server_request_duration_seconds",
28        "Lockbook server's HTTP request duration in seconds",
29        &["request"]
30    )
31    .unwrap();
32    pub static ref CORE_VERSION_COUNTER: CounterVec = register_counter_vec!(
33        "lockbook_server_core_version",
34        "Core version request attempts",
35        &["request"]
36    )
37    .unwrap();
38}
39
40#[macro_export]
41macro_rules! core_req {
42    ($Req: ty, $handler: path, $state: ident) => {{
43        use lockbook_shared::api::{ErrorWrapper, Request};
44        use lockbook_shared::file_metadata::Owner;
45        use tracing::*;
46        use $crate::router_service::{self, deserialize_and_check, method};
47        use $crate::{RequestContext, ServerError};
48
49        let cloned_state = $state.clone();
50
51        method(<$Req>::METHOD)
52            .and(warp::path(&<$Req>::ROUTE[1..]))
53            .and(warp::any().map(move || cloned_state.clone()))
54            .and(warp::body::bytes())
55            .and(warp::header::optional::<String>("Accept-Version"))
56            .then(|state: Arc<ServerState<S, A, G, D>>, request: Bytes, version: Option<String>| {
57                let span1 = span!(
58                    Level::INFO,
59                    "matched_request",
60                    method = &<$Req>::METHOD.as_str(),
61                    route = &<$Req>::ROUTE,
62                );
63                async move {
64                    let state = state.as_ref();
65                    let timer = router_service::HTTP_REQUEST_DURATION_HISTOGRAM
66                        .with_label_values(&[<$Req>::ROUTE])
67                        .start_timer();
68
69                    let request: RequestWrapper<$Req> =
70                        match deserialize_and_check(&state.config, request, version) {
71                            Ok(req) => req,
72                            Err(err) => {
73                                warn!("request failed to parse: {:?}", err);
74                                return warp::reply::json::<Result<RequestWrapper<$Req>, _>>(&Err(
75                                    err,
76                                ));
77                            }
78                        };
79
80                    debug!("request verified successfully");
81                    let req_pk = request.signed_request.public_key;
82                    let username = match state.index_db.lock().map(|db| {
83                        db.accounts
84                            .get()
85                            .get(&Owner(req_pk))
86                            .map(|account| account.username.clone())
87                    }) {
88                        Ok(Some(username)) => username,
89                        Ok(None) => "~unknown~".to_string(),
90                        Err(error) => {
91                            error!(?error, "dbrs error");
92                            "~error~".to_string()
93                        }
94                    };
95                    let req_pk = base64::encode(req_pk.serialize_compressed());
96
97                    let span2 = span!(
98                        Level::INFO,
99                        "verified_request_signature",
100                        username = username.as_str(),
101                        public_key = req_pk.as_str()
102                    );
103                    let rc: RequestContext<$Req> = RequestContext {
104                        request: request.signed_request.timestamped_value.value,
105                        public_key: request.signed_request.public_key,
106                    };
107                    async move {
108                        let to_serialize = match $handler(state, rc).await {
109                            Ok(response) => {
110                                info!("request processed successfully");
111                                Ok(response)
112                            }
113                            Err(ServerError::ClientError(e)) => {
114                                warn!("request rejected due to a client error: {:?}", e);
115                                Err(ErrorWrapper::Endpoint(e))
116                            }
117                            Err(ServerError::InternalError(e)) => {
118                                error!("Internal error {}: {}", <$Req>::ROUTE, e);
119                                Err(ErrorWrapper::InternalError)
120                            }
121                        };
122                        let response = warp::reply::json(&to_serialize);
123                        timer.observe_duration();
124                        response
125                    }
126                    .instrument(span2)
127                    .await
128                }
129                .instrument(span1)
130            })
131    }};
132}
133
134pub fn core_routes<S, A, G, D>(
135    server_state: &Arc<ServerState<S, A, G, D>>,
136) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone
137where
138    S: StripeClient,
139    A: AppStoreClient,
140    G: GooglePlayClient,
141    D: DocumentService,
142{
143    core_req!(NewAccountRequest, ServerState::new_account, server_state)
144        .or(core_req!(ChangeDocRequest, ServerState::change_doc, server_state))
145        .or(core_req!(UpsertRequest, ServerState::upsert_file_metadata, server_state))
146        .or(core_req!(GetDocRequest, ServerState::get_document, server_state))
147        .or(core_req!(GetPublicKeyRequest, ServerState::get_public_key, server_state))
148        .or(core_req!(GetUsernameRequest, ServerState::get_username, server_state))
149        .or(core_req!(GetUsageRequest, ServerState::get_usage, server_state))
150        .or(core_req!(GetFileIdsRequest, ServerState::get_file_ids, server_state))
151        .or(core_req!(GetUpdatesRequest, ServerState::get_updates, server_state))
152        .or(core_req!(
153            UpgradeAccountGooglePlayRequest,
154            ServerState::upgrade_account_google_play,
155            server_state
156        ))
157        .or(core_req!(
158            UpgradeAccountStripeRequest,
159            ServerState::upgrade_account_stripe,
160            server_state
161        ))
162        .or(core_req!(
163            UpgradeAccountAppStoreRequest,
164            ServerState::upgrade_account_app_store,
165            server_state
166        ))
167        .or(core_req!(CancelSubscriptionRequest, ServerState::cancel_subscription, server_state))
168        .or(core_req!(GetSubscriptionInfoRequest, ServerState::get_subscription_info, server_state))
169        .or(core_req!(DeleteAccountRequest, ServerState::delete_account, server_state))
170        .or(core_req!(
171            AdminDisappearAccountRequest,
172            ServerState::admin_disappear_account,
173            server_state
174        ))
175        .or(core_req!(AdminDisappearFileRequest, ServerState::admin_disappear_file, server_state))
176        .or(core_req!(AdminListUsersRequest, ServerState::admin_list_users, server_state))
177        .or(core_req!(
178            AdminGetAccountInfoRequest,
179            ServerState::admin_get_account_info,
180            server_state
181        ))
182        .or(core_req!(
183            AdminValidateAccountRequest,
184            ServerState::admin_validate_account,
185            server_state
186        ))
187        .or(core_req!(AdminValidateServerRequest, ServerState::admin_validate_server, server_state))
188        .or(core_req!(AdminFileInfoRequest, ServerState::admin_file_info, server_state))
189        .or(core_req!(AdminRebuildIndexRequest, ServerState::admin_rebuild_index, server_state))
190        .or(core_req!(AdminSetUserTierRequest, ServerState::admin_set_user_tier, server_state))
191}
192
193pub fn build_info() -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
194    warp::get()
195        .and(warp::path(&GetBuildInfoRequest::ROUTE[1..]))
196        .map(|| {
197            let span = span!(
198                Level::INFO,
199                "matched_request",
200                method = &GetBuildInfoRequest::METHOD.as_str(),
201                route = &GetBuildInfoRequest::ROUTE,
202            );
203            let _enter = span.enter();
204            let timer = router_service::HTTP_REQUEST_DURATION_HISTOGRAM
205                .with_label_values(&[GetBuildInfoRequest::ROUTE])
206                .start_timer();
207            let resp = get_build_info();
208            info!("request processed successfully");
209            let resp = warp::reply::json(&resp);
210            timer.observe_duration();
211            resp
212        })
213}
214
215pub fn get_metrics() -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone
216{
217    warp::get().and(warp::path("metrics")).map(|| {
218        let span = span!(Level::INFO, "matched_request", method = "GET", route = "/metrics",);
219        let _enter = span.enter();
220        match TextEncoder::new().encode_to_string(prometheus::gather().as_slice()) {
221            Ok(metrics) => {
222                info!("request processed successfully");
223                metrics
224            }
225            Err(err) => {
226                error!("Error preparing response for prometheus: {:?}", err);
227                String::new()
228            }
229        }
230    })
231}
232
233static STRIPE_WEBHOOK_ROUTE: &str = "stripe-webhooks";
234
235pub fn stripe_webhooks<S, A, G, D>(
236    server_state: &Arc<ServerState<S, A, G, D>>,
237) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone
238where
239    S: StripeClient,
240    A: AppStoreClient,
241    G: GooglePlayClient,
242    D: DocumentService,
243{
244    let cloned_state = server_state.clone();
245
246    warp::post()
247        .and(warp::path(STRIPE_WEBHOOK_ROUTE))
248        .and(warp::any().map(move || cloned_state.clone()))
249        .and(warp::body::bytes())
250        .and(warp::header::header("Stripe-Signature"))
251        .then(
252            |state: Arc<ServerState<S, A, G, D>>, request: Bytes, stripe_sig: HeaderValue| async move {
253                let span = span!(
254                    Level::INFO,
255                    "matched_request",
256                    method = "POST",
257                    route = format!("/{}", STRIPE_WEBHOOK_ROUTE).as_str()
258                );
259                let _enter = span.enter();
260                info!("webhook routed");
261                let response = span
262                    .in_scope(|| ServerState::stripe_webhooks(&state, request, stripe_sig))
263                    .await;
264
265                match response {
266                    Ok(_) => warp::reply::with_status("".to_string(), StatusCode::OK),
267                    Err(e) => {
268                        error!("{:?}", e);
269
270                        let status_code = match e {
271                            ServerError::ClientError(StripeWebhookError::VerificationError(_))
272                            | ServerError::ClientError(StripeWebhookError::InvalidBody(_))
273                            | ServerError::ClientError(StripeWebhookError::InvalidHeader(_))
274                            | ServerError::ClientError(StripeWebhookError::ParseError(_)) => {
275                                StatusCode::BAD_REQUEST
276                            }
277                            ServerError::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
278                        };
279
280                        warp::reply::with_status("".to_string(), status_code)
281                    }
282                }
283            },
284        )
285}
286
287static PLAY_WEBHOOK_ROUTE: &str = "google_play_notification_webhook";
288
289pub fn google_play_notification_webhooks<S, A, G, D>(
290    server_state: &Arc<ServerState<S, A, G, D>>,
291) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone
292where
293    S: StripeClient,
294    A: AppStoreClient,
295    G: GooglePlayClient,
296    D: DocumentService,
297{
298    let cloned_state = server_state.clone();
299
300    warp::post()
301        .and(warp::path(PLAY_WEBHOOK_ROUTE))
302        .and(warp::any().map(move || cloned_state.clone()))
303        .and(warp::body::bytes())
304        .and(warp::query::query::<HashMap<String, String>>())
305        .then(
306            |state: Arc<ServerState<S, A, G, D>>,
307             request: Bytes,
308             query_parameters: HashMap<String, String>| async move {
309                let span = span!(
310                    Level::INFO,
311                    "matched_request",
312                    method = "POST",
313                    route = format!("/{}", PLAY_WEBHOOK_ROUTE).as_str()
314                );
315                let _enter = span.enter();
316                info!("webhook routed");
317                let response = span
318                    .in_scope(|| state.google_play_notification_webhooks(request, query_parameters))
319                    .await;
320                match response {
321                    Ok(_) => warp::reply::with_status("".to_string(), StatusCode::OK),
322                    Err(e) => {
323                        error!("{:?}", e);
324
325                        let status_code = match e {
326                            ServerError::ClientError(GooglePlayWebhookError::InvalidToken)
327                            | ServerError::ClientError(
328                                GooglePlayWebhookError::CannotRetrieveData,
329                            )
330                            | ServerError::ClientError(
331                                GooglePlayWebhookError::CannotDecodePubSubData(_),
332                            ) => StatusCode::BAD_REQUEST,
333                            ServerError::ClientError(
334                                GooglePlayWebhookError::CannotRetrieveUserInfo,
335                            )
336                            | ServerError::ClientError(
337                                GooglePlayWebhookError::CannotRetrievePublicKey,
338                            )
339                            | ServerError::ClientError(GooglePlayWebhookError::CannotParseTime)
340                            | ServerError::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
341                        };
342
343                        warp::reply::with_status("".to_string(), status_code)
344                    }
345                }
346            },
347        )
348}
349
350static APP_STORE_WEBHOOK_ROUTE: &str = "app_store_notification_webhook";
351pub fn app_store_notification_webhooks<S, A, G, D>(
352    server_state: &Arc<ServerState<S, A, G, D>>,
353) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone
354where
355    S: StripeClient,
356    A: AppStoreClient,
357    G: GooglePlayClient,
358    D: DocumentService,
359{
360    let cloned_state = server_state.clone();
361
362    warp::post()
363        .and(warp::path(APP_STORE_WEBHOOK_ROUTE))
364        .and(warp::any().map(move || cloned_state.clone()))
365        .and(warp::body::bytes())
366        .then(|state: Arc<ServerState<S, A, G, D>>, body: Bytes| async move {
367            let span = span!(
368                Level::INFO,
369                "matched_request",
370                method = "POST",
371                route = format!("/{}", APP_STORE_WEBHOOK_ROUTE).as_str()
372            );
373            let _enter = span.enter();
374            info!("webhook routed");
375            let response = span
376                .in_scope(|| state.app_store_notification_webhook(body))
377                .await;
378
379            match response {
380                Ok(_) => warp::reply::with_status("".to_string(), StatusCode::OK),
381                Err(e) => {
382                    error!("{:?}", e);
383
384                    let status_code = match e {
385                        ServerError::ClientError(AppStoreNotificationError::InvalidJWS) => {
386                            StatusCode::BAD_REQUEST
387                        }
388                        ServerError::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
389                    };
390
391                    warp::reply::with_status("".to_string(), status_code)
392                }
393            }
394        })
395}
396
397pub fn method(name: Method) -> impl Filter<Extract = (), Error = Rejection> + Clone {
398    warp::method()
399        .and(warp::any().map(move || name.clone()))
400        .and_then(|request: Method, intention: Method| async move {
401            if request == intention {
402                Ok(())
403            } else {
404                Err(reject::not_found())
405            }
406        })
407        .untuple_one()
408}
409
410pub fn deserialize_and_check<Req>(
411    config: &Config, request: Bytes, version: Option<String>,
412) -> Result<RequestWrapper<Req>, ErrorWrapper<Req::Error>>
413where
414    Req: Request + DeserializeOwned + Serialize,
415{
416    handle_version_header::<Req>(config, &version)?;
417
418    let request = serde_json::from_slice(request.as_ref()).map_err(|err| {
419        warn!("Request parsing failure: {}", err);
420        ErrorWrapper::<Req::Error>::BadRequest
421    })?;
422
423    verify_auth(config, &request).map_err(|err| match err.kind {
424        SharedErrorKind::SignatureExpired(_) | SharedErrorKind::SignatureInTheFuture(_) => {
425            warn!("expired auth");
426            ErrorWrapper::<Req::Error>::ExpiredAuth
427        }
428        _ => {
429            warn!("invalid auth");
430            ErrorWrapper::<Req::Error>::InvalidAuth
431        }
432    })?;
433
434    Ok(request)
435}