exocore_transport/http/
server.rs

1use std::{borrow::Cow, net::Ipv4Addr, sync::Arc};
2
3use exocore_core::{
4    cell::{Cell, CellNodes, LocalNode, Node},
5    framing::{CapnpFrameBuilder, FrameBuilder},
6    futures::block_on,
7    sec::auth_token::AuthToken,
8    time::Clock,
9    utils::handle_set::HandleSet,
10};
11use exocore_protos::{
12    capnp,
13    generated::store_transport_capnp::{
14        mutation_request, mutation_response, query_request, query_response,
15    },
16};
17use futures::{channel::mpsc, lock::Mutex, FutureExt, StreamExt};
18use hyper::{
19    service::{make_service_fn, service_fn},
20    Body, Request, Response, Server, StatusCode,
21};
22
23use super::{
24    handles::{ServiceHandle, ServiceHandles},
25    requests::{RequestTracker, TrackedRequest},
26    HttpTransportConfig, HttpTransportServiceHandle,
27};
28use crate::{transport::ConnectionId, Error, InMessage, OutEvent, OutMessage, ServiceType};
29
30/// Unidirectional HTTP transport server used for request-response type of
31/// communication by clients for which a full libp2p transport is impossible.
32///
33/// Since it doesn't run a full fledge transport, authentication is achieved
34/// through a generated `AuthToken` signed by the public key of a node of the
35/// cell.
36///
37/// At the moment, this transport is only used for entity queries and mutations.
38pub struct HttpTransportServer {
39    local_node: LocalNode,
40    config: HttpTransportConfig,
41    clock: Clock,
42    service_handles: Arc<Mutex<ServiceHandles>>,
43    handle_set: HandleSet,
44}
45
46impl HttpTransportServer {
47    /// Creates a new HTTP server with the given configuration and clock.
48    pub fn new(
49        local_node: LocalNode,
50        config: HttpTransportConfig,
51        clock: Clock,
52    ) -> HttpTransportServer {
53        HttpTransportServer {
54            local_node,
55            config,
56            clock,
57            service_handles: Default::default(),
58            handle_set: Default::default(),
59        }
60    }
61
62    /// Get a transport handle that will be used by services. This handle can
63    /// only be used to receive messages and reply to them.
64    pub fn get_handle(
65        &mut self,
66        cell: Cell,
67        service_type: ServiceType,
68    ) -> Result<HttpTransportServiceHandle, Error> {
69        let (in_sender, in_receiver) = mpsc::channel(self.config.handle_in_channel_size);
70        let (out_sender, out_receiver) = mpsc::channel(self.config.handle_out_channel_size);
71
72        // Register new handle and its streams
73        let mut service_handles = block_on(self.service_handles.lock());
74        service_handles.push_handle(cell.clone(), service_type, in_sender, out_receiver);
75
76        info!(
77            "Registering transport for cell {} and service type {:?}",
78            cell, service_type
79        );
80
81        Ok(HttpTransportServiceHandle {
82            cell_id: cell.id().clone(),
83            service_type,
84            inner: Arc::downgrade(&self.service_handles),
85            sink: Some(out_sender),
86            stream: Some(in_receiver),
87            handle: self.handle_set.get_handle(),
88        })
89    }
90
91    /// Runs the HTTP server and returns when it's done.
92    pub async fn run(self) -> Result<(), Error> {
93        let request_tracker = Arc::new(RequestTracker::new(self.config.clone()));
94
95        // Listen on all addresses
96        let servers = {
97            let mut futures = Vec::new();
98            for listen_url in &self.config.listen_addresses(&self.local_node)? {
99                let host = listen_url
100                    .host()
101                    .unwrap_or_else(|| url::Host::Ipv4(Ipv4Addr::new(0, 0, 0, 0)));
102                let port = listen_url.port().unwrap_or(80);
103                let addr_res = format!("{}:{}", host, port).parse();
104                let addr = match addr_res {
105                    Ok(addr) => addr,
106                    Err(err) => {
107                        error!(
108                            "Couldn't extract and parse listen address from url {} ({}:{}): {}",
109                            listen_url, host, port, err
110                        );
111                        continue;
112                    }
113                };
114
115                info!("Starting a server on {} ({})", addr, listen_url);
116
117                let request_tracker = request_tracker.clone();
118                let service_handles = self.service_handles.clone();
119                let clock = self.clock.clone();
120
121                let server = Server::bind(&addr).serve(make_service_fn(move |_socket| {
122                    let request_tracker = request_tracker.clone();
123                    let service_handles = service_handles.clone();
124                    let clock = clock.clone();
125                    async move {
126                        Ok::<_, hyper::Error>(service_fn(move |req| {
127                            let request_tracker = request_tracker.clone();
128                            let service_handles = service_handles.clone();
129                            let clock = clock.clone();
130
131                            async {
132                                let resp =
133                                    handle_request(request_tracker, service_handles, clock, req)
134                                        .await;
135
136                                let resp = match resp {
137                                    Ok(resp) => resp,
138                                    Err(err) => {
139                                        error!("Error handling request: {}", err);
140                                        err.to_response()
141                                    }
142                                };
143
144                                Ok::<_, hyper::Error>(resp)
145                            }
146                        }))
147                    }
148                }));
149
150                futures.push(server);
151            }
152
153            futures::future::join_all(futures)
154        };
155
156        // Takes care of outgoing messages from services to be dispatched to connections
157        let handles_dispatcher = {
158            let services = self.service_handles.clone();
159            let request_tracker = request_tracker.clone();
160
161            let futures = async move {
162                let mut inner = services.lock().await;
163
164                let mut futures = Vec::new();
165                for service_channels in inner.service_handles.values_mut() {
166                    let mut out_receiver = service_channels
167                        .out_receiver
168                        .take()
169                        .expect("Out receiver of one service was already consumed");
170
171                    let connections = request_tracker.clone();
172                    futures.push(async move {
173                        while let Some(event) = out_receiver.next().await {
174                            match event {
175                                OutEvent::Message(message) => {
176                                    let connection_id = match message.connection {
177                                        Some(ConnectionId::HttpServer(id)) => id,
178                                        _ => {
179                                            warn!("Couldn't find connection id in message to be send back to connection");
180                                            continue;
181                                        }
182                                    };
183                                    connections.reply(connection_id, message).await;
184                                }
185                                OutEvent::Reset => {
186                                    // Nothing to do
187                                }
188                            }
189                        }
190                    });
191                }
192
193                futures
194            }.await;
195
196            futures::future::join_all(futures)
197        };
198
199        info!("HTTP transport now running");
200        futures::select! {
201            _ = servers.fuse() => {},
202            _ = handles_dispatcher.fuse() => {},
203            _ = self.handle_set.on_handles_dropped().fuse() => {},
204        };
205        info!("HTTP transport is done");
206
207        Ok(())
208    }
209}
210
211/// Handles a single request from a connection by sending it to the appropriate
212/// service.
213async fn handle_request(
214    request_tracker: Arc<RequestTracker>,
215    service_handles: Arc<Mutex<ServiceHandles>>,
216    clock: Clock,
217    req: Request<Body>,
218) -> Result<Response<Body>, RequestError> {
219    let request_type = RequestType::from_url_path(req.uri().path()).map_err(|err| {
220        error!("Invalid request type with path {}", req.uri().path());
221        err
222    })?;
223
224    // Authentify the request using the authentication token and extract cell & node
225    // from it
226    let auth_token_str = read_authorization_token(&req)?;
227    let auth_token = AuthToken::decode_base58_string(&auth_token_str).map_err(|err| {
228        warn!(
229            "Unauthorized request for {:?} using token {}: {}",
230            request_type, auth_token_str, err
231        );
232        RequestError::Unauthorized
233    })?;
234
235    let mut services = service_handles.lock().await;
236    let service = services
237        .get_handle(auth_token.cell_id(), request_type.service_type())
238        .ok_or_else(|| {
239            warn!("Cell {} not found for request", auth_token.cell_id());
240            RequestError::InvalidRequestType
241        })?;
242
243    let from_node = {
244        let cell_nodes = service.cell.nodes();
245        cell_nodes
246            .get(auth_token.node_id())
247            .map(|c| c.node().clone())
248            .ok_or_else(|| {
249                warn!(
250                    "Node {} not found in cell {} for request",
251                    auth_token.node_id(),
252                    auth_token.cell_id()
253                );
254                RequestError::InvalidRequestType
255            })?
256    };
257
258    // validate token
259    auth_token
260        .is_valid(&service.cell, &clock)
261        .map_err(|_| RequestError::Unauthorized)?;
262
263    match request_type {
264        RequestType::StoreQuery => {
265            let body_bytes = hyper::body::to_bytes(req.into_body()).await?;
266            let tracked_request = request_tracker.push().await;
267            let cell = service.cell.clone();
268
269            send_entity_query(
270                body_bytes.as_ref(),
271                &clock,
272                from_node,
273                service,
274                &tracked_request,
275            )
276            .await?;
277
278            drop(services); // drop handles to release lock while we wait for answer
279
280            Ok(receive_entity_query(&cell, tracked_request).await?)
281        }
282        RequestType::StoreMutation => {
283            let body_bytes = hyper::body::to_bytes(req.into_body()).await?;
284            let tracked_request = request_tracker.push().await;
285            let cell = service.cell.clone();
286
287            send_entity_mutation(
288                body_bytes.as_ref(),
289                &clock,
290                from_node,
291                service,
292                &tracked_request,
293            )
294            .await?;
295
296            drop(services); // drop handles to release lock while we wait for answer
297
298            Ok(receive_entity_mutation(&cell, tracked_request).await?)
299        }
300    }
301}
302
303async fn send_entity_query(
304    body_bytes: &[u8],
305    clock: &Clock,
306    from_node: Node,
307    service: &mut ServiceHandle,
308    tracked_request: &TrackedRequest,
309) -> Result<(), RequestError> {
310    let local_node = service.cell.local_node().node().clone();
311
312    let mut frame_builder = CapnpFrameBuilder::<query_request::Owned>::new();
313    let mut msg_builder = frame_builder.get_builder();
314    msg_builder.set_request(body_bytes);
315
316    let message =
317        OutMessage::from_framed_message(&service.cell, ServiceType::Store, frame_builder)?
318            .with_destination(local_node)
319            .with_rdv(clock.consistent_time(service.cell.local_node()))
320            .with_connection(ConnectionId::HttpServer(tracked_request.id()))
321            .to_in_message(from_node)?;
322
323    service.send_message(message)?;
324
325    Ok(())
326}
327
328async fn receive_entity_query(
329    cell: &Cell,
330    tracked_request: TrackedRequest,
331) -> Result<Response<Body>, RequestError> {
332    let local_node = cell.local_node().node().clone();
333
334    let response_message = tracked_request
335        .get_response_or_timeout()
336        .await
337        .map_err(|_| RequestError::Server("Couldn't receive response from handle".to_string()))?;
338
339    let message_envelope = response_message.envelope_builder.as_owned_frame();
340    let message = InMessage::from_node_and_frame(local_node, message_envelope)?;
341    let result_message = message.get_data_as_framed_message::<query_response::Owned>()?;
342    let result_reader = result_message.get_reader()?;
343
344    if !result_reader.has_error() {
345        let body = Body::from(result_reader.get_response()?.to_vec());
346        Ok(Response::new(body))
347    } else {
348        Err(RequestError::Query)
349    }
350}
351
352async fn send_entity_mutation(
353    body_bytes: &[u8],
354    clock: &Clock,
355    from_node: Node,
356    service: &mut ServiceHandle,
357    tracked_request: &TrackedRequest,
358) -> Result<(), RequestError> {
359    let local_node = service.cell.local_node().node().clone();
360
361    let mut frame_builder = CapnpFrameBuilder::<mutation_request::Owned>::new();
362    let mut msg_builder = frame_builder.get_builder();
363    msg_builder.set_request(body_bytes);
364
365    let message =
366        OutMessage::from_framed_message(&service.cell, ServiceType::Store, frame_builder)?
367            .with_destination(local_node)
368            .with_rdv(clock.consistent_time(service.cell.local_node()))
369            .with_connection(ConnectionId::HttpServer(tracked_request.id()))
370            .to_in_message(from_node)?;
371
372    service.send_message(message)?;
373
374    Ok(())
375}
376
377async fn receive_entity_mutation(
378    cell: &Cell,
379    tracked_request: TrackedRequest,
380) -> Result<Response<Body>, RequestError> {
381    let local_node = cell.local_node().node().clone();
382
383    let response_message = tracked_request
384        .get_response_or_timeout()
385        .await
386        .map_err(|_| RequestError::Server("Couldn't receive response from handle".to_string()))?;
387
388    let message_envelope = response_message.envelope_builder.as_owned_frame();
389    let message = InMessage::from_node_and_frame(local_node, message_envelope)?;
390    let result_message = message.get_data_as_framed_message::<mutation_response::Owned>()?;
391    let result_reader = result_message.get_reader()?;
392
393    if !result_reader.has_error() {
394        let body = Body::from(result_reader.get_response()?.to_vec());
395        Ok(Response::new(body))
396    } else {
397        Err(RequestError::Query)
398    }
399}
400
401fn read_authorization_token(request: &Request<Body>) -> Result<String, RequestError> {
402    let pq = request.uri();
403    let path_and_query = pq.path_and_query().ok_or(RequestError::Unauthorized)?;
404    let query = path_and_query.query().ok_or(RequestError::Unauthorized)?;
405
406    let params = url::form_urlencoded::parse(query.as_bytes());
407    let token = get_query_token(params).ok_or(RequestError::Unauthorized)?;
408
409    Ok(token.to_string())
410}
411
412fn get_query_token(pairs: url::form_urlencoded::Parse) -> Option<Cow<str>> {
413    for (key, value) in pairs {
414        if key == "token" {
415            return Some(value);
416        }
417    }
418
419    None
420}
421
422/// Type of an incoming HTTP request.
423#[derive(Debug, PartialEq)]
424enum RequestType {
425    StoreQuery,
426    StoreMutation,
427}
428
429impl RequestType {
430    fn from_url_path(path: &str) -> Result<RequestType, RequestError> {
431        if path == "/store/query" {
432            Ok(RequestType::StoreQuery)
433        } else if path == "/store/mutate" {
434            Ok(RequestType::StoreMutation)
435        } else {
436            Err(RequestError::InvalidRequestType)
437        }
438    }
439
440    fn service_type(&self) -> ServiceType {
441        match self {
442            RequestType::StoreQuery => ServiceType::Store,
443            RequestType::StoreMutation => ServiceType::Store,
444        }
445    }
446}
447
448/// Request related error.
449#[derive(Debug, thiserror::Error)]
450pub enum RequestError {
451    #[error("Invalid request type")]
452    InvalidRequestType,
453    #[error("Request unauthorized")]
454    Unauthorized,
455    #[error("Query error")]
456    Query,
457    #[error("Internal server error: {0}")]
458    Server(String),
459    #[error("Transport error: {0}")]
460    Transport(#[from] crate::Error),
461    #[error("Capnp serialization error: {0}")]
462    Serialization(#[from] capnp::Error),
463    #[error("Hyper error: {0}")]
464    Hyper(#[from] hyper::Error),
465}
466
467impl RequestError {
468    fn to_response(&self) -> Response<Body> {
469        let mut resp = Response::default();
470        let status = match self {
471            RequestError::InvalidRequestType => StatusCode::NOT_FOUND,
472            RequestError::Unauthorized => StatusCode::UNAUTHORIZED,
473            RequestError::Query => StatusCode::BAD_REQUEST,
474            _ => StatusCode::INTERNAL_SERVER_ERROR,
475        };
476
477        *resp.status_mut() = status;
478        resp
479    }
480}