Skip to main content

lockbook_server_lib/
router_service.rs

1use crate::billing::app_store_client::AppStoreClient;
2use crate::billing::billing_service::*;
3use crate::billing::google_play_client::GooglePlayClient;
4use crate::billing::stripe_client::StripeClient;
5use crate::config::Config;
6use crate::document_service::DocumentService;
7use crate::utils::get_build_info;
8use crate::{ServerError, ServerState, handle_version_header, router_service, verify_auth};
9use lazy_static::lazy_static;
10use lb_rs::model::api::{ErrorWrapper, Request, RequestWrapper, *};
11use lb_rs::model::errors::{LbErrKind, SignError};
12use prometheus::{
13    CounterVec, HistogramVec, TextEncoder, register_counter_vec, register_histogram_vec,
14};
15use serde::Serialize;
16use serde::de::DeserializeOwned;
17use std::collections::HashMap;
18use std::sync::Arc;
19use tracing::*;
20use warp::http::{HeaderValue, Method, StatusCode};
21use warp::hyper::body::Bytes;
22use warp::{Filter, Rejection, reject};
23
24lazy_static! {
25    pub static ref HTTP_REQUEST_DURATION_HISTOGRAM: HistogramVec = register_histogram_vec!(
26        "lockbook_server_request_duration_seconds",
27        "Lockbook server's HTTP request duration in seconds",
28        &["request"]
29    )
30    .unwrap();
31    pub static ref CORE_VERSION_COUNTER: CounterVec = register_counter_vec!(
32        "lockbook_server_core_version",
33        "Core version request attempts",
34        &["request"]
35    )
36    .unwrap();
37}
38
39#[macro_export]
40macro_rules! core_req {
41    ($Req: ty, $handler: path, $state: ident) => {{
42        use lb_rs::model::api::{ErrorWrapper, Request};
43        use lb_rs::model::file_metadata::Owner;
44        use std::net::SocketAddr;
45        use tracing::*;
46        use $crate::router_service::{self, deserialize_and_check, method};
47        use $crate::{RequestContext, ServerError};
48
49        let cloned_state = $state.clone();
50
51        method(<$Req>::METHOD)
52            .and(warp::path(&<$Req>::ROUTE[1..]))
53            .and(warp::any().map(move || cloned_state.clone()))
54            .and(warp::body::bytes())
55            .and(warp::header::optional::<String>("Accept-Version"))
56            .and(warp::filters::addr::remote())
57            .then(
58                |state: Arc<ServerState<S, A, G, D>>,
59                 request: Bytes,
60                 version: Option<String>,
61                 ip: Option<SocketAddr>| {
62                    if ip.is_none() {
63                        tracing::error!("ip not present in request");
64                    }
65                    let span1 = span!(
66                        Level::INFO,
67                        "matched_request",
68                        http.method = &<$Req>::METHOD.as_str(),
69                        http.url = &<$Req>::ROUTE,
70                        http.remote_ip = ip
71                            .map(|ip| ip.to_string())
72                            .unwrap_or_else(|| String::from("unsupported")),
73                        core_version = version
74                            .clone()
75                            .unwrap_or_else(|| String::from("not-present")),
76                    );
77
78                    async move {
79                        let state = state.as_ref();
80                        let timer = router_service::HTTP_REQUEST_DURATION_HISTOGRAM
81                            .with_label_values(&[<$Req>::ROUTE])
82                            .start_timer();
83
84                        let request: RequestWrapper<$Req> =
85                            match deserialize_and_check(&state.config, request, version) {
86                                Ok(req) => req,
87                                Err(err) => {
88                                    warn!("request failed to parse: {:?}", err);
89                                    return warp::reply::with_status(
90                                        warp::reply::json::<Result<RequestWrapper<$Req>, _>>(&Err(
91                                            err,
92                                        )),
93                                        warp::http::StatusCode::BAD_REQUEST,
94                                    );
95                                }
96                            };
97
98                        let req_pk = request.signed_request.public_key;
99                        let username = {
100                            let db = state.index_db.lock().await;
101                            match db
102                                .accounts
103                                .get()
104                                .get(&Owner(req_pk))
105                                .map(|account| account.username.clone())
106                            {
107                                Some(username) => username,
108                                None => "~unknown~".to_string(),
109                            }
110                        };
111                        let req_pk = base64::encode(req_pk.serialize_compressed());
112
113                        let span2 = span!(
114                            Level::INFO,
115                            "verified_request_signature",
116                            username = username.as_str(),
117                            public_key = req_pk.as_str()
118                        );
119                        if ip.is_none() {
120                            tracing::error!("ip not present in request");
121                        }
122                        let rc: RequestContext<$Req> = RequestContext {
123                            request: request.signed_request.timestamped_value.value,
124                            public_key: request.signed_request.public_key,
125                            ip,
126                        };
127
128                        async move {
129                            let status;
130                            let mut level = tracing::Level::INFO;
131                            let to_serialize = match $handler(state, rc).await {
132                                Ok(response) => {
133                                    status = warp::http::StatusCode::OK;
134                                    Ok(response)
135                                }
136                                Err(ServerError::ClientError(e)) => {
137                                    status = warp::http::StatusCode::BAD_REQUEST;
138                                    level = tracing::Level::WARN;
139                                    Err(ErrorWrapper::Endpoint(e))
140                                }
141                                Err(ServerError::InternalError(e)) => {
142                                    status = warp::http::StatusCode::INTERNAL_SERVER_ERROR;
143                                    level = tracing::Level::ERROR;
144                                    tracing::error!("internal: {e}");
145                                    Err(ErrorWrapper::InternalError)
146                                }
147                            };
148                            let response =
149                                warp::reply::with_status(warp::reply::json(&to_serialize), status);
150                            let log = format!("{status} {} {username}", &<$Req>::ROUTE);
151                            let latency = timer.stop_and_record();
152                            match level {
153                                tracing::Level::INFO => {
154                                    tracing::info!(
155                                        http.latency = latency,
156                                        http.status = status.as_u16(),
157                                        "{log}"
158                                    );
159                                }
160                                tracing::Level::WARN => {
161                                    tracing::warn!(
162                                        http.latency = latency,
163                                        http.status = status.as_u16(),
164                                        "{log}"
165                                    );
166                                }
167                                tracing::Level::ERROR => {
168                                    tracing::error!(
169                                        http.latency = latency,
170                                        http.status = status.as_u16(),
171                                        "{log}"
172                                    );
173                                }
174                                _ => {
175                                    tracing::debug!(
176                                        http.latency = latency,
177                                        http.status = status.as_u16(),
178                                        "{log}"
179                                    );
180                                }
181                            }
182
183                            response
184                        }
185                        .instrument(span2)
186                        .await
187                    }
188                    .instrument(span1)
189                },
190            )
191    }};
192}
193
194pub fn core_routes<S, A, G, D>(
195    server_state: &Arc<ServerState<S, A, G, D>>,
196) -> impl Filter<Extract = (impl warp::Reply + use<S, A, G, D>,), Error = Rejection>
197+ Clone
198+ use<S, A, G, D>
199where
200    S: StripeClient,
201    A: AppStoreClient,
202    G: GooglePlayClient,
203    D: DocumentService,
204{
205    core_req!(NewAccountRequestV2, ServerState::new_account_v2, server_state)
206        .or(core_req!(ChangeDocRequestV2, ServerState::change_doc_v2, server_state))
207        .or(core_req!(UpsertRequestV2, ServerState::upsert_file_metadata_v2, server_state))
208        .or(core_req!(GetDocRequest, ServerState::get_document, server_state))
209        .or(core_req!(GetPublicKeyRequest, ServerState::get_public_key, server_state))
210        .or(core_req!(GetUsernameRequest, ServerState::get_username, server_state))
211        .or(core_req!(GetUsageRequest, ServerState::get_usage, server_state))
212        .or(core_req!(GetFileIdsRequest, ServerState::get_file_ids, server_state))
213        .or(core_req!(GetUpdatesRequestV2, ServerState::get_updates_v2, server_state))
214        .or(core_req!(
215            UpgradeAccountGooglePlayRequest,
216            ServerState::upgrade_account_google_play,
217            server_state
218        ))
219        .or(core_req!(
220            UpgradeAccountStripeRequest,
221            ServerState::upgrade_account_stripe,
222            server_state
223        ))
224        .or(core_req!(
225            UpgradeAccountAppStoreRequest,
226            ServerState::upgrade_account_app_store,
227            server_state
228        ))
229        .or(core_req!(CancelSubscriptionRequest, ServerState::cancel_subscription, server_state))
230        .or(core_req!(GetSubscriptionInfoRequest, ServerState::get_subscription_info, server_state))
231        .or(core_req!(DeleteAccountRequest, ServerState::delete_account, server_state))
232        .or(core_req!(UpsertDebugInfoRequest, ServerState::upsert_debug_info, server_state))
233        .or(core_req!(
234            AdminDisappearAccountRequest,
235            ServerState::admin_disappear_account,
236            server_state
237        ))
238        .or(core_req!(AdminDisappearFileRequest, ServerState::admin_disappear_file, server_state))
239        .or(core_req!(AdminListUsersRequest, ServerState::admin_list_users, server_state))
240        .or(core_req!(
241            AdminGetAccountInfoRequest,
242            ServerState::admin_get_account_info,
243            server_state
244        ))
245        .or(core_req!(
246            AdminValidateAccountRequest,
247            ServerState::admin_validate_account,
248            server_state
249        ))
250        .or(core_req!(AdminValidateServerRequest, ServerState::admin_validate_server, server_state))
251        .or(core_req!(AdminFileInfoRequest, ServerState::admin_file_info, server_state))
252        .or(core_req!(AdminRebuildIndexRequest, ServerState::admin_rebuild_index, server_state))
253        .or(core_req!(AdminSetUserTierRequest, ServerState::admin_set_user_tier, server_state))
254}
255
256pub fn build_info() -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
257    warp::get()
258        .and(warp::path(&GetBuildInfoRequest::ROUTE[1..]))
259        .map(|| {
260            let span = span!(
261                Level::INFO,
262                "matched_request",
263                method = &GetBuildInfoRequest::METHOD.as_str(),
264                route = &GetBuildInfoRequest::ROUTE,
265            );
266            let _enter = span.enter();
267            let timer = router_service::HTTP_REQUEST_DURATION_HISTOGRAM
268                .with_label_values(&[GetBuildInfoRequest::ROUTE])
269                .start_timer();
270            let resp = get_build_info();
271            info!("request processed successfully");
272            let resp = warp::reply::json(&resp);
273            timer.observe_duration();
274            resp
275        })
276}
277
278pub fn get_metrics() -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone
279{
280    warp::get().and(warp::path("metrics")).map(|| {
281        let span = span!(Level::INFO, "matched_request", method = "GET", route = "/metrics",);
282        let _enter = span.enter();
283        match TextEncoder::new().encode_to_string(prometheus::gather().as_slice()) {
284            Ok(metrics) => {
285                info!("request processed successfully");
286                metrics
287            }
288            Err(err) => {
289                error!("Error preparing response for prometheus: {:?}", err);
290                String::new()
291            }
292        }
293    })
294}
295
296static STRIPE_WEBHOOK_ROUTE: &str = "stripe-webhooks";
297
298pub fn stripe_webhooks<S, A, G, D>(
299    server_state: &Arc<ServerState<S, A, G, D>>,
300) -> impl Filter<Extract = (impl warp::Reply + use<S, A, G, D>,), Error = warp::Rejection>
301+ Clone
302+ use<S, A, G, D>
303where
304    S: StripeClient,
305    A: AppStoreClient,
306    G: GooglePlayClient,
307    D: DocumentService,
308{
309    let cloned_state = server_state.clone();
310
311    warp::post()
312        .and(warp::path(STRIPE_WEBHOOK_ROUTE))
313        .and(warp::any().map(move || cloned_state.clone()))
314        .and(warp::body::bytes())
315        .and(warp::header::header("Stripe-Signature"))
316        .then(
317            |state: Arc<ServerState<S, A, G, D>>, request: Bytes, stripe_sig: HeaderValue| async move {
318                let span = span!(
319                    Level::INFO,
320                    "matched_request",
321                    method = "POST",
322                    route = format!("/{STRIPE_WEBHOOK_ROUTE}").as_str()
323                );
324                let _enter = span.enter();
325                info!("webhook routed");
326                let response = span
327                    .in_scope(|| ServerState::stripe_webhooks(&state, request, stripe_sig))
328                    .await;
329
330                match response {
331                    Ok(_) => warp::reply::with_status("".to_string(), StatusCode::OK),
332                    Err(e) => {
333                        error!("{:?}", e);
334
335                        let status_code = match e {
336                            ServerError::ClientError(StripeWebhookError::VerificationError(_))
337                            | ServerError::ClientError(StripeWebhookError::InvalidBody(_))
338                            | ServerError::ClientError(StripeWebhookError::InvalidHeader(_))
339                            | ServerError::ClientError(StripeWebhookError::ParseError(_)) => {
340                                StatusCode::BAD_REQUEST
341                            }
342                            ServerError::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
343                        };
344
345                        warp::reply::with_status("".to_string(), status_code)
346                    }
347                }
348            },
349        )
350}
351
352static PLAY_WEBHOOK_ROUTE: &str = "google_play_notification_webhook";
353
354pub fn google_play_notification_webhooks<S, A, G, D>(
355    server_state: &Arc<ServerState<S, A, G, D>>,
356) -> impl Filter<Extract = (impl warp::Reply + use<S, A, G, D>,), Error = warp::Rejection>
357+ Clone
358+ use<S, A, G, D>
359where
360    S: StripeClient,
361    A: AppStoreClient,
362    G: GooglePlayClient,
363    D: DocumentService,
364{
365    let cloned_state = server_state.clone();
366
367    warp::post()
368        .and(warp::path(PLAY_WEBHOOK_ROUTE))
369        .and(warp::any().map(move || cloned_state.clone()))
370        .and(warp::body::bytes())
371        .and(warp::query::query::<HashMap<String, String>>())
372        .then(
373            |state: Arc<ServerState<S, A, G, D>>,
374             request: Bytes,
375             query_parameters: HashMap<String, String>| async move {
376                let span = span!(
377                    Level::INFO,
378                    "matched_request",
379                    method = "POST",
380                    route = format!("/{PLAY_WEBHOOK_ROUTE}").as_str()
381                );
382                let _enter = span.enter();
383                info!("webhook routed");
384                let response = span
385                    .in_scope(|| state.google_play_notification_webhooks(request, query_parameters))
386                    .await;
387                match response {
388                    Ok(_) => warp::reply::with_status("".to_string(), StatusCode::OK),
389                    Err(e) => {
390                        error!("{:?}", e);
391
392                        let status_code = match e {
393                            ServerError::ClientError(GooglePlayWebhookError::InvalidToken)
394                            | ServerError::ClientError(
395                                GooglePlayWebhookError::CannotRetrieveData,
396                            )
397                            | ServerError::ClientError(
398                                GooglePlayWebhookError::CannotDecodePubSubData(_),
399                            ) => StatusCode::BAD_REQUEST,
400                            ServerError::ClientError(
401                                GooglePlayWebhookError::CannotRetrieveUserInfo,
402                            )
403                            | ServerError::ClientError(
404                                GooglePlayWebhookError::CannotRetrievePublicKey,
405                            )
406                            | ServerError::ClientError(GooglePlayWebhookError::CannotParseTime)
407                            | ServerError::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
408                        };
409
410                        warp::reply::with_status("".to_string(), status_code)
411                    }
412                }
413            },
414        )
415}
416
417static APP_STORE_WEBHOOK_ROUTE: &str = "app_store_notification_webhook";
418pub fn app_store_notification_webhooks<S, A, G, D>(
419    server_state: &Arc<ServerState<S, A, G, D>>,
420) -> impl Filter<Extract = (impl warp::Reply + use<S, A, G, D>,), Error = warp::Rejection>
421+ Clone
422+ use<S, A, G, D>
423where
424    S: StripeClient,
425    A: AppStoreClient,
426    G: GooglePlayClient,
427    D: DocumentService,
428{
429    let cloned_state = server_state.clone();
430
431    warp::post()
432        .and(warp::path(APP_STORE_WEBHOOK_ROUTE))
433        .and(warp::any().map(move || cloned_state.clone()))
434        .and(warp::body::bytes())
435        .then(|state: Arc<ServerState<S, A, G, D>>, body: Bytes| async move {
436            let span = span!(
437                Level::INFO,
438                "matched_request",
439                method = "POST",
440                route = format!("/{APP_STORE_WEBHOOK_ROUTE}").as_str()
441            );
442            let _enter = span.enter();
443            info!("webhook routed");
444            let response = span
445                .in_scope(|| state.app_store_notification_webhook(body))
446                .await;
447
448            match response {
449                Ok(_) => warp::reply::with_status("".to_string(), StatusCode::OK),
450                Err(e) => {
451                    error!("{:?}", e);
452
453                    let status_code = match e {
454                        ServerError::ClientError(AppStoreNotificationError::InvalidJWS) => {
455                            StatusCode::BAD_REQUEST
456                        }
457                        ServerError::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
458                    };
459
460                    warp::reply::with_status("".to_string(), status_code)
461                }
462            }
463        })
464}
465
466pub fn method(name: Method) -> impl Filter<Extract = (), Error = Rejection> + Clone {
467    warp::method()
468        .and(warp::any().map(move || name.clone()))
469        .and_then(|request: Method, intention: Method| async move {
470            if request == intention { Ok(()) } else { Err(reject::not_found()) }
471        })
472        .untuple_one()
473}
474
475pub fn deserialize_and_check<Req>(
476    config: &Config, request: Bytes, version: Option<String>,
477) -> Result<RequestWrapper<Req>, ErrorWrapper<Req::Error>>
478where
479    Req: Request + DeserializeOwned + Serialize,
480{
481    handle_version_header::<Req>(config, &version)?;
482
483    let request = serde_json::from_slice(request.as_ref()).map_err(|err| {
484        warn!("Request parsing failure: {}", err);
485        ErrorWrapper::<Req::Error>::BadRequest
486    })?;
487
488    verify_auth(config, &request).map_err(|err| match err.kind {
489        LbErrKind::Sign(SignError::SignatureExpired(_))
490        | LbErrKind::Sign(SignError::SignatureInTheFuture(_)) => {
491            warn!("expired auth");
492            ErrorWrapper::<Req::Error>::ExpiredAuth
493        }
494        _ => {
495            warn!("invalid auth");
496            ErrorWrapper::<Req::Error>::InvalidAuth
497        }
498    })?;
499
500    Ok(request)
501}