Skip to main content

lockbook_server_lib/
router_service.rs

1use crate::billing::app_store_client::AppStoreClient;
2use crate::billing::billing_service::*;
3use crate::billing::google_play_client::GooglePlayClient;
4use crate::billing::stripe_client::StripeClient;
5use crate::config::Config;
6use crate::document_service::DocumentService;
7use crate::utils::get_build_info;
8use crate::{ServerError, ServerState, handle_version_header, router_service, verify_auth};
9use lazy_static::lazy_static;
10use lb_rs::model::api::{ErrorWrapper, Request, RequestWrapper, *};
11use lb_rs::model::errors::{LbErrKind, SignError};
12use prometheus::{
13    CounterVec, HistogramVec, TextEncoder, register_counter_vec, register_histogram_vec,
14};
15use serde::Serialize;
16use serde::de::DeserializeOwned;
17use std::collections::HashMap;
18use std::sync::Arc;
19use tracing::*;
20use warp::http::{HeaderValue, Method, StatusCode};
21use warp::hyper::body::Bytes;
22use warp::{Filter, Rejection, reject};
23
24lazy_static! {
25    pub static ref HTTP_REQUEST_DURATION_HISTOGRAM: HistogramVec = register_histogram_vec!(
26        "lockbook_server_request_duration_seconds",
27        "Lockbook server's HTTP request duration in seconds",
28        &["request"]
29    )
30    .unwrap();
31    pub static ref CORE_VERSION_COUNTER: CounterVec = register_counter_vec!(
32        "lockbook_server_core_version",
33        "Core version request attempts",
34        &["request"]
35    )
36    .unwrap();
37}
38
39#[macro_export]
40macro_rules! core_req {
41    ($Req: ty, $handler: path, $state: ident) => {{
42        use lb_rs::model::api::{ErrorWrapper, Request};
43        use lb_rs::model::file_metadata::Owner;
44        use std::net::SocketAddr;
45        use tracing::*;
46        use $crate::router_service::{self, deserialize_and_check, method};
47        use $crate::{RequestContext, ServerError};
48
49        let cloned_state = $state.clone();
50
51        method(<$Req>::METHOD)
52            .and(warp::path(&<$Req>::ROUTE[1..]))
53            .and(warp::any().map(move || cloned_state.clone()))
54            .and(warp::body::bytes())
55            .and(warp::header::optional::<String>("Accept-Version"))
56            .and(warp::filters::addr::remote())
57            .then(
58                |state: Arc<ServerState<S, A, G, D>>,
59                 request: Bytes,
60                 version: Option<String>,
61                 ip: Option<SocketAddr>| {
62                    if ip.is_none() {
63                        tracing::error!("ip not present in request");
64                    }
65                    let span1 = span!(
66                        Level::INFO,
67                        "matched_request",
68                        http.method = &<$Req>::METHOD.as_str(),
69                        http.url = &<$Req>::ROUTE,
70                        http.remote_ip = ip
71                            .map(|ip| ip.to_string())
72                            .unwrap_or_else(|| String::from("unsupported")),
73                        core_version = version
74                            .clone()
75                            .unwrap_or_else(|| String::from("not-present")),
76                    );
77
78                    async move {
79                        let state = state.as_ref();
80                        let timer = router_service::HTTP_REQUEST_DURATION_HISTOGRAM
81                            .with_label_values(&[<$Req>::ROUTE])
82                            .start_timer();
83
84                        let request: RequestWrapper<$Req> =
85                            match deserialize_and_check(&state.config, request, version) {
86                                Ok(req) => req,
87                                Err(err) => {
88                                    warn!("request failed to parse: {:?}", err);
89                                    return warp::reply::with_status(
90                                        warp::reply::json::<Result<RequestWrapper<$Req>, _>>(&Err(
91                                            err,
92                                        )),
93                                        warp::http::StatusCode::BAD_REQUEST,
94                                    );
95                                }
96                            };
97
98                        let req_pk = request.signed_request.public_key;
99                        let username = {
100                            let db = state.index_db.lock().await;
101                            match db
102                                .accounts
103                                .get()
104                                .get(&Owner(req_pk))
105                                .map(|account| account.username.clone())
106                            {
107                                Some(username) => username,
108                                None => "~unknown~".to_string(),
109                            }
110                        };
111                        let req_pk = base64::encode(req_pk.serialize_compressed());
112
113                        let span2 = span!(
114                            Level::INFO,
115                            "verified_request_signature",
116                            username = username.as_str(),
117                            public_key = req_pk.as_str()
118                        );
119                        if ip.is_none() {
120                            tracing::error!("ip not present in request");
121                        }
122                        let rc: RequestContext<$Req> = RequestContext {
123                            request: request.signed_request.timestamped_value.value,
124                            public_key: request.signed_request.public_key,
125                            ip,
126                        };
127
128                        async move {
129                            let status;
130                            let mut level = tracing::Level::INFO;
131                            let to_serialize = match $handler(state, rc).await {
132                                Ok(response) => {
133                                    status = warp::http::StatusCode::OK;
134                                    Ok(response)
135                                }
136                                Err(ServerError::ClientError(e)) => {
137                                    status = warp::http::StatusCode::BAD_REQUEST;
138                                    level = tracing::Level::WARN;
139                                    Err(ErrorWrapper::Endpoint(e))
140                                }
141                                Err(ServerError::InternalError(e)) => {
142                                    status = warp::http::StatusCode::INTERNAL_SERVER_ERROR;
143                                    level = tracing::Level::ERROR;
144                                    tracing::error!("internal: {e}");
145                                    Err(ErrorWrapper::InternalError)
146                                }
147                            };
148                            let response =
149                                warp::reply::with_status(warp::reply::json(&to_serialize), status);
150                            let log = format!("{status} {} {username}", &<$Req>::ROUTE);
151                            let latency = timer.stop_and_record();
152                            match level {
153                                tracing::Level::INFO => {
154                                    tracing::info!(
155                                        http.latency = latency,
156                                        http.status = status.as_u16(),
157                                        "{log}"
158                                    );
159                                }
160                                tracing::Level::WARN => {
161                                    tracing::warn!(
162                                        http.latency = latency,
163                                        http.status = status.as_u16(),
164                                        "{log}"
165                                    );
166                                }
167                                tracing::Level::ERROR => {
168                                    tracing::error!(
169                                        http.latency = latency,
170                                        http.status = status.as_u16(),
171                                        "{log}"
172                                    );
173                                }
174                                _ => {
175                                    tracing::debug!(
176                                        http.latency = latency,
177                                        http.status = status.as_u16(),
178                                        "{log}"
179                                    );
180                                }
181                            }
182
183                            response
184                        }
185                        .instrument(span2)
186                        .await
187                    }
188                    .instrument(span1)
189                },
190            )
191    }};
192}
193
194pub fn core_routes<S, A, G, D>(
195    server_state: &Arc<ServerState<S, A, G, D>>,
196) -> impl Filter<Extract = (impl warp::Reply + use<S, A, G, D>,), Error = Rejection>
197+ Clone
198+ use<S, A, G, D>
199where
200    S: StripeClient,
201    A: AppStoreClient,
202    G: GooglePlayClient,
203    D: DocumentService,
204{
205    core_req!(NewAccountRequest, ServerState::new_account, server_state)
206        .or(core_req!(NewAccountRequestV2, ServerState::new_account_v2, server_state))
207        .or(core_req!(ChangeDocRequest, ServerState::change_doc, server_state))
208        .or(core_req!(ChangeDocRequestV2, ServerState::change_doc_v2, server_state))
209        .or(core_req!(UpsertRequest, ServerState::upsert_file_metadata, server_state))
210        .or(core_req!(UpsertRequestV2, ServerState::upsert_file_metadata_v2, server_state))
211        .or(core_req!(GetDocRequest, ServerState::get_document, server_state))
212        .or(core_req!(GetPublicKeyRequest, ServerState::get_public_key, server_state))
213        .or(core_req!(GetUsernameRequest, ServerState::get_username, server_state))
214        .or(core_req!(GetUsageRequest, ServerState::get_usage, server_state))
215        .or(core_req!(GetFileIdsRequest, ServerState::get_file_ids, server_state))
216        .or(core_req!(GetUpdatesRequest, ServerState::get_updates, server_state))
217        .or(core_req!(GetUpdatesRequestV2, ServerState::get_updates_v2, server_state))
218        .or(core_req!(
219            UpgradeAccountGooglePlayRequest,
220            ServerState::upgrade_account_google_play,
221            server_state
222        ))
223        .or(core_req!(
224            UpgradeAccountStripeRequest,
225            ServerState::upgrade_account_stripe,
226            server_state
227        ))
228        .or(core_req!(
229            UpgradeAccountAppStoreRequest,
230            ServerState::upgrade_account_app_store,
231            server_state
232        ))
233        .or(core_req!(CancelSubscriptionRequest, ServerState::cancel_subscription, server_state))
234        .or(core_req!(GetSubscriptionInfoRequest, ServerState::get_subscription_info, server_state))
235        .or(core_req!(DeleteAccountRequest, ServerState::delete_account, server_state))
236        .or(core_req!(UpsertDebugInfoRequest, ServerState::upsert_debug_info, server_state))
237        .or(core_req!(
238            AdminDisappearAccountRequest,
239            ServerState::admin_disappear_account,
240            server_state
241        ))
242        .or(core_req!(AdminDisappearFileRequest, ServerState::admin_disappear_file, server_state))
243        .or(core_req!(AdminListUsersRequest, ServerState::admin_list_users, server_state))
244        .or(core_req!(
245            AdminGetAccountInfoRequest,
246            ServerState::admin_get_account_info,
247            server_state
248        ))
249        .or(core_req!(
250            AdminValidateAccountRequest,
251            ServerState::admin_validate_account,
252            server_state
253        ))
254        .or(core_req!(AdminValidateServerRequest, ServerState::admin_validate_server, server_state))
255        .or(core_req!(AdminFileInfoRequest, ServerState::admin_file_info, server_state))
256        .or(core_req!(AdminRebuildIndexRequest, ServerState::admin_rebuild_index, server_state))
257        .or(core_req!(AdminSetUserTierRequest, ServerState::admin_set_user_tier, server_state))
258}
259
260pub fn build_info() -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
261    warp::get()
262        .and(warp::path(&GetBuildInfoRequest::ROUTE[1..]))
263        .map(|| {
264            let span = span!(
265                Level::INFO,
266                "matched_request",
267                method = &GetBuildInfoRequest::METHOD.as_str(),
268                route = &GetBuildInfoRequest::ROUTE,
269            );
270            let _enter = span.enter();
271            let timer = router_service::HTTP_REQUEST_DURATION_HISTOGRAM
272                .with_label_values(&[GetBuildInfoRequest::ROUTE])
273                .start_timer();
274            let resp = get_build_info();
275            info!("request processed successfully");
276            let resp = warp::reply::json(&resp);
277            timer.observe_duration();
278            resp
279        })
280}
281
282pub fn get_metrics() -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone
283{
284    warp::get().and(warp::path("metrics")).map(|| {
285        let span = span!(Level::INFO, "matched_request", method = "GET", route = "/metrics",);
286        let _enter = span.enter();
287        match TextEncoder::new().encode_to_string(prometheus::gather().as_slice()) {
288            Ok(metrics) => {
289                info!("request processed successfully");
290                metrics
291            }
292            Err(err) => {
293                error!("Error preparing response for prometheus: {:?}", err);
294                String::new()
295            }
296        }
297    })
298}
299
300static STRIPE_WEBHOOK_ROUTE: &str = "stripe-webhooks";
301
302pub fn stripe_webhooks<S, A, G, D>(
303    server_state: &Arc<ServerState<S, A, G, D>>,
304) -> impl Filter<Extract = (impl warp::Reply + use<S, A, G, D>,), Error = warp::Rejection>
305+ Clone
306+ use<S, A, G, D>
307where
308    S: StripeClient,
309    A: AppStoreClient,
310    G: GooglePlayClient,
311    D: DocumentService,
312{
313    let cloned_state = server_state.clone();
314
315    warp::post()
316        .and(warp::path(STRIPE_WEBHOOK_ROUTE))
317        .and(warp::any().map(move || cloned_state.clone()))
318        .and(warp::body::bytes())
319        .and(warp::header::header("Stripe-Signature"))
320        .then(
321            |state: Arc<ServerState<S, A, G, D>>, request: Bytes, stripe_sig: HeaderValue| async move {
322                let span = span!(
323                    Level::INFO,
324                    "matched_request",
325                    method = "POST",
326                    route = format!("/{STRIPE_WEBHOOK_ROUTE}").as_str()
327                );
328                let _enter = span.enter();
329                info!("webhook routed");
330                let response = span
331                    .in_scope(|| ServerState::stripe_webhooks(&state, request, stripe_sig))
332                    .await;
333
334                match response {
335                    Ok(_) => warp::reply::with_status("".to_string(), StatusCode::OK),
336                    Err(e) => {
337                        error!("{:?}", e);
338
339                        let status_code = match e {
340                            ServerError::ClientError(StripeWebhookError::VerificationError(_))
341                            | ServerError::ClientError(StripeWebhookError::InvalidBody(_))
342                            | ServerError::ClientError(StripeWebhookError::InvalidHeader(_))
343                            | ServerError::ClientError(StripeWebhookError::ParseError(_)) => {
344                                StatusCode::BAD_REQUEST
345                            }
346                            ServerError::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
347                        };
348
349                        warp::reply::with_status("".to_string(), status_code)
350                    }
351                }
352            },
353        )
354}
355
356static PLAY_WEBHOOK_ROUTE: &str = "google_play_notification_webhook";
357
358pub fn google_play_notification_webhooks<S, A, G, D>(
359    server_state: &Arc<ServerState<S, A, G, D>>,
360) -> impl Filter<Extract = (impl warp::Reply + use<S, A, G, D>,), Error = warp::Rejection>
361+ Clone
362+ use<S, A, G, D>
363where
364    S: StripeClient,
365    A: AppStoreClient,
366    G: GooglePlayClient,
367    D: DocumentService,
368{
369    let cloned_state = server_state.clone();
370
371    warp::post()
372        .and(warp::path(PLAY_WEBHOOK_ROUTE))
373        .and(warp::any().map(move || cloned_state.clone()))
374        .and(warp::body::bytes())
375        .and(warp::query::query::<HashMap<String, String>>())
376        .then(
377            |state: Arc<ServerState<S, A, G, D>>,
378             request: Bytes,
379             query_parameters: HashMap<String, String>| async move {
380                let span = span!(
381                    Level::INFO,
382                    "matched_request",
383                    method = "POST",
384                    route = format!("/{PLAY_WEBHOOK_ROUTE}").as_str()
385                );
386                let _enter = span.enter();
387                info!("webhook routed");
388                let response = span
389                    .in_scope(|| state.google_play_notification_webhooks(request, query_parameters))
390                    .await;
391                match response {
392                    Ok(_) => warp::reply::with_status("".to_string(), StatusCode::OK),
393                    Err(e) => {
394                        error!("{:?}", e);
395
396                        let status_code = match e {
397                            ServerError::ClientError(GooglePlayWebhookError::InvalidToken)
398                            | ServerError::ClientError(
399                                GooglePlayWebhookError::CannotRetrieveData,
400                            )
401                            | ServerError::ClientError(
402                                GooglePlayWebhookError::CannotDecodePubSubData(_),
403                            ) => StatusCode::BAD_REQUEST,
404                            ServerError::ClientError(
405                                GooglePlayWebhookError::CannotRetrieveUserInfo,
406                            )
407                            | ServerError::ClientError(
408                                GooglePlayWebhookError::CannotRetrievePublicKey,
409                            )
410                            | ServerError::ClientError(GooglePlayWebhookError::CannotParseTime)
411                            | ServerError::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
412                        };
413
414                        warp::reply::with_status("".to_string(), status_code)
415                    }
416                }
417            },
418        )
419}
420
421static APP_STORE_WEBHOOK_ROUTE: &str = "app_store_notification_webhook";
422pub fn app_store_notification_webhooks<S, A, G, D>(
423    server_state: &Arc<ServerState<S, A, G, D>>,
424) -> impl Filter<Extract = (impl warp::Reply + use<S, A, G, D>,), Error = warp::Rejection>
425+ Clone
426+ use<S, A, G, D>
427where
428    S: StripeClient,
429    A: AppStoreClient,
430    G: GooglePlayClient,
431    D: DocumentService,
432{
433    let cloned_state = server_state.clone();
434
435    warp::post()
436        .and(warp::path(APP_STORE_WEBHOOK_ROUTE))
437        .and(warp::any().map(move || cloned_state.clone()))
438        .and(warp::body::bytes())
439        .then(|state: Arc<ServerState<S, A, G, D>>, body: Bytes| async move {
440            let span = span!(
441                Level::INFO,
442                "matched_request",
443                method = "POST",
444                route = format!("/{APP_STORE_WEBHOOK_ROUTE}").as_str()
445            );
446            let _enter = span.enter();
447            info!("webhook routed");
448            let response = span
449                .in_scope(|| state.app_store_notification_webhook(body))
450                .await;
451
452            match response {
453                Ok(_) => warp::reply::with_status("".to_string(), StatusCode::OK),
454                Err(e) => {
455                    error!("{:?}", e);
456
457                    let status_code = match e {
458                        ServerError::ClientError(AppStoreNotificationError::InvalidJWS) => {
459                            StatusCode::BAD_REQUEST
460                        }
461                        ServerError::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
462                    };
463
464                    warp::reply::with_status("".to_string(), status_code)
465                }
466            }
467        })
468}
469
470pub fn method(name: Method) -> impl Filter<Extract = (), Error = Rejection> + Clone {
471    warp::method()
472        .and(warp::any().map(move || name.clone()))
473        .and_then(|request: Method, intention: Method| async move {
474            if request == intention { Ok(()) } else { Err(reject::not_found()) }
475        })
476        .untuple_one()
477}
478
479pub fn deserialize_and_check<Req>(
480    config: &Config, request: Bytes, version: Option<String>,
481) -> Result<RequestWrapper<Req>, ErrorWrapper<Req::Error>>
482where
483    Req: Request + DeserializeOwned + Serialize,
484{
485    handle_version_header::<Req>(config, &version)?;
486
487    let request = serde_json::from_slice(request.as_ref()).map_err(|err| {
488        warn!("Request parsing failure: {}", err);
489        ErrorWrapper::<Req::Error>::BadRequest
490    })?;
491
492    verify_auth(config, &request).map_err(|err| match err.kind {
493        LbErrKind::Sign(SignError::SignatureExpired(_))
494        | LbErrKind::Sign(SignError::SignatureInTheFuture(_)) => {
495            warn!("expired auth");
496            ErrorWrapper::<Req::Error>::ExpiredAuth
497        }
498        _ => {
499            warn!("invalid auth");
500            ErrorWrapper::<Req::Error>::InvalidAuth
501        }
502    })?;
503
504    Ok(request)
505}