1use std::{borrow::Cow, net::Ipv4Addr, sync::Arc};
2
3use exocore_core::{
4 cell::{Cell, CellNodes, LocalNode, Node},
5 framing::{CapnpFrameBuilder, FrameBuilder},
6 futures::block_on,
7 sec::auth_token::AuthToken,
8 time::Clock,
9 utils::handle_set::HandleSet,
10};
11use exocore_protos::{
12 capnp,
13 generated::store_transport_capnp::{
14 mutation_request, mutation_response, query_request, query_response,
15 },
16};
17use futures::{channel::mpsc, lock::Mutex, FutureExt, StreamExt};
18use hyper::{
19 service::{make_service_fn, service_fn},
20 Body, Request, Response, Server, StatusCode,
21};
22
23use super::{
24 handles::{ServiceHandle, ServiceHandles},
25 requests::{RequestTracker, TrackedRequest},
26 HttpTransportConfig, HttpTransportServiceHandle,
27};
28use crate::{transport::ConnectionId, Error, InMessage, OutEvent, OutMessage, ServiceType};
29
30pub struct HttpTransportServer {
39 local_node: LocalNode,
40 config: HttpTransportConfig,
41 clock: Clock,
42 service_handles: Arc<Mutex<ServiceHandles>>,
43 handle_set: HandleSet,
44}
45
46impl HttpTransportServer {
47 pub fn new(
49 local_node: LocalNode,
50 config: HttpTransportConfig,
51 clock: Clock,
52 ) -> HttpTransportServer {
53 HttpTransportServer {
54 local_node,
55 config,
56 clock,
57 service_handles: Default::default(),
58 handle_set: Default::default(),
59 }
60 }
61
62 pub fn get_handle(
65 &mut self,
66 cell: Cell,
67 service_type: ServiceType,
68 ) -> Result<HttpTransportServiceHandle, Error> {
69 let (in_sender, in_receiver) = mpsc::channel(self.config.handle_in_channel_size);
70 let (out_sender, out_receiver) = mpsc::channel(self.config.handle_out_channel_size);
71
72 let mut service_handles = block_on(self.service_handles.lock());
74 service_handles.push_handle(cell.clone(), service_type, in_sender, out_receiver);
75
76 info!(
77 "Registering transport for cell {} and service type {:?}",
78 cell, service_type
79 );
80
81 Ok(HttpTransportServiceHandle {
82 cell_id: cell.id().clone(),
83 service_type,
84 inner: Arc::downgrade(&self.service_handles),
85 sink: Some(out_sender),
86 stream: Some(in_receiver),
87 handle: self.handle_set.get_handle(),
88 })
89 }
90
91 pub async fn run(self) -> Result<(), Error> {
93 let request_tracker = Arc::new(RequestTracker::new(self.config.clone()));
94
95 let servers = {
97 let mut futures = Vec::new();
98 for listen_url in &self.config.listen_addresses(&self.local_node)? {
99 let host = listen_url
100 .host()
101 .unwrap_or_else(|| url::Host::Ipv4(Ipv4Addr::new(0, 0, 0, 0)));
102 let port = listen_url.port().unwrap_or(80);
103 let addr_res = format!("{}:{}", host, port).parse();
104 let addr = match addr_res {
105 Ok(addr) => addr,
106 Err(err) => {
107 error!(
108 "Couldn't extract and parse listen address from url {} ({}:{}): {}",
109 listen_url, host, port, err
110 );
111 continue;
112 }
113 };
114
115 info!("Starting a server on {} ({})", addr, listen_url);
116
117 let request_tracker = request_tracker.clone();
118 let service_handles = self.service_handles.clone();
119 let clock = self.clock.clone();
120
121 let server = Server::bind(&addr).serve(make_service_fn(move |_socket| {
122 let request_tracker = request_tracker.clone();
123 let service_handles = service_handles.clone();
124 let clock = clock.clone();
125 async move {
126 Ok::<_, hyper::Error>(service_fn(move |req| {
127 let request_tracker = request_tracker.clone();
128 let service_handles = service_handles.clone();
129 let clock = clock.clone();
130
131 async {
132 let resp =
133 handle_request(request_tracker, service_handles, clock, req)
134 .await;
135
136 let resp = match resp {
137 Ok(resp) => resp,
138 Err(err) => {
139 error!("Error handling request: {}", err);
140 err.to_response()
141 }
142 };
143
144 Ok::<_, hyper::Error>(resp)
145 }
146 }))
147 }
148 }));
149
150 futures.push(server);
151 }
152
153 futures::future::join_all(futures)
154 };
155
156 let handles_dispatcher = {
158 let services = self.service_handles.clone();
159 let request_tracker = request_tracker.clone();
160
161 let futures = async move {
162 let mut inner = services.lock().await;
163
164 let mut futures = Vec::new();
165 for service_channels in inner.service_handles.values_mut() {
166 let mut out_receiver = service_channels
167 .out_receiver
168 .take()
169 .expect("Out receiver of one service was already consumed");
170
171 let connections = request_tracker.clone();
172 futures.push(async move {
173 while let Some(event) = out_receiver.next().await {
174 match event {
175 OutEvent::Message(message) => {
176 let connection_id = match message.connection {
177 Some(ConnectionId::HttpServer(id)) => id,
178 _ => {
179 warn!("Couldn't find connection id in message to be send back to connection");
180 continue;
181 }
182 };
183 connections.reply(connection_id, message).await;
184 }
185 OutEvent::Reset => {
186 }
188 }
189 }
190 });
191 }
192
193 futures
194 }.await;
195
196 futures::future::join_all(futures)
197 };
198
199 info!("HTTP transport now running");
200 futures::select! {
201 _ = servers.fuse() => {},
202 _ = handles_dispatcher.fuse() => {},
203 _ = self.handle_set.on_handles_dropped().fuse() => {},
204 };
205 info!("HTTP transport is done");
206
207 Ok(())
208 }
209}
210
211async fn handle_request(
214 request_tracker: Arc<RequestTracker>,
215 service_handles: Arc<Mutex<ServiceHandles>>,
216 clock: Clock,
217 req: Request<Body>,
218) -> Result<Response<Body>, RequestError> {
219 let request_type = RequestType::from_url_path(req.uri().path()).map_err(|err| {
220 error!("Invalid request type with path {}", req.uri().path());
221 err
222 })?;
223
224 let auth_token_str = read_authorization_token(&req)?;
227 let auth_token = AuthToken::decode_base58_string(&auth_token_str).map_err(|err| {
228 warn!(
229 "Unauthorized request for {:?} using token {}: {}",
230 request_type, auth_token_str, err
231 );
232 RequestError::Unauthorized
233 })?;
234
235 let mut services = service_handles.lock().await;
236 let service = services
237 .get_handle(auth_token.cell_id(), request_type.service_type())
238 .ok_or_else(|| {
239 warn!("Cell {} not found for request", auth_token.cell_id());
240 RequestError::InvalidRequestType
241 })?;
242
243 let from_node = {
244 let cell_nodes = service.cell.nodes();
245 cell_nodes
246 .get(auth_token.node_id())
247 .map(|c| c.node().clone())
248 .ok_or_else(|| {
249 warn!(
250 "Node {} not found in cell {} for request",
251 auth_token.node_id(),
252 auth_token.cell_id()
253 );
254 RequestError::InvalidRequestType
255 })?
256 };
257
258 auth_token
260 .is_valid(&service.cell, &clock)
261 .map_err(|_| RequestError::Unauthorized)?;
262
263 match request_type {
264 RequestType::StoreQuery => {
265 let body_bytes = hyper::body::to_bytes(req.into_body()).await?;
266 let tracked_request = request_tracker.push().await;
267 let cell = service.cell.clone();
268
269 send_entity_query(
270 body_bytes.as_ref(),
271 &clock,
272 from_node,
273 service,
274 &tracked_request,
275 )
276 .await?;
277
278 drop(services); Ok(receive_entity_query(&cell, tracked_request).await?)
281 }
282 RequestType::StoreMutation => {
283 let body_bytes = hyper::body::to_bytes(req.into_body()).await?;
284 let tracked_request = request_tracker.push().await;
285 let cell = service.cell.clone();
286
287 send_entity_mutation(
288 body_bytes.as_ref(),
289 &clock,
290 from_node,
291 service,
292 &tracked_request,
293 )
294 .await?;
295
296 drop(services); Ok(receive_entity_mutation(&cell, tracked_request).await?)
299 }
300 }
301}
302
303async fn send_entity_query(
304 body_bytes: &[u8],
305 clock: &Clock,
306 from_node: Node,
307 service: &mut ServiceHandle,
308 tracked_request: &TrackedRequest,
309) -> Result<(), RequestError> {
310 let local_node = service.cell.local_node().node().clone();
311
312 let mut frame_builder = CapnpFrameBuilder::<query_request::Owned>::new();
313 let mut msg_builder = frame_builder.get_builder();
314 msg_builder.set_request(body_bytes);
315
316 let message =
317 OutMessage::from_framed_message(&service.cell, ServiceType::Store, frame_builder)?
318 .with_destination(local_node)
319 .with_rdv(clock.consistent_time(service.cell.local_node()))
320 .with_connection(ConnectionId::HttpServer(tracked_request.id()))
321 .to_in_message(from_node)?;
322
323 service.send_message(message)?;
324
325 Ok(())
326}
327
328async fn receive_entity_query(
329 cell: &Cell,
330 tracked_request: TrackedRequest,
331) -> Result<Response<Body>, RequestError> {
332 let local_node = cell.local_node().node().clone();
333
334 let response_message = tracked_request
335 .get_response_or_timeout()
336 .await
337 .map_err(|_| RequestError::Server("Couldn't receive response from handle".to_string()))?;
338
339 let message_envelope = response_message.envelope_builder.as_owned_frame();
340 let message = InMessage::from_node_and_frame(local_node, message_envelope)?;
341 let result_message = message.get_data_as_framed_message::<query_response::Owned>()?;
342 let result_reader = result_message.get_reader()?;
343
344 if !result_reader.has_error() {
345 let body = Body::from(result_reader.get_response()?.to_vec());
346 Ok(Response::new(body))
347 } else {
348 Err(RequestError::Query)
349 }
350}
351
352async fn send_entity_mutation(
353 body_bytes: &[u8],
354 clock: &Clock,
355 from_node: Node,
356 service: &mut ServiceHandle,
357 tracked_request: &TrackedRequest,
358) -> Result<(), RequestError> {
359 let local_node = service.cell.local_node().node().clone();
360
361 let mut frame_builder = CapnpFrameBuilder::<mutation_request::Owned>::new();
362 let mut msg_builder = frame_builder.get_builder();
363 msg_builder.set_request(body_bytes);
364
365 let message =
366 OutMessage::from_framed_message(&service.cell, ServiceType::Store, frame_builder)?
367 .with_destination(local_node)
368 .with_rdv(clock.consistent_time(service.cell.local_node()))
369 .with_connection(ConnectionId::HttpServer(tracked_request.id()))
370 .to_in_message(from_node)?;
371
372 service.send_message(message)?;
373
374 Ok(())
375}
376
377async fn receive_entity_mutation(
378 cell: &Cell,
379 tracked_request: TrackedRequest,
380) -> Result<Response<Body>, RequestError> {
381 let local_node = cell.local_node().node().clone();
382
383 let response_message = tracked_request
384 .get_response_or_timeout()
385 .await
386 .map_err(|_| RequestError::Server("Couldn't receive response from handle".to_string()))?;
387
388 let message_envelope = response_message.envelope_builder.as_owned_frame();
389 let message = InMessage::from_node_and_frame(local_node, message_envelope)?;
390 let result_message = message.get_data_as_framed_message::<mutation_response::Owned>()?;
391 let result_reader = result_message.get_reader()?;
392
393 if !result_reader.has_error() {
394 let body = Body::from(result_reader.get_response()?.to_vec());
395 Ok(Response::new(body))
396 } else {
397 Err(RequestError::Query)
398 }
399}
400
401fn read_authorization_token(request: &Request<Body>) -> Result<String, RequestError> {
402 let pq = request.uri();
403 let path_and_query = pq.path_and_query().ok_or(RequestError::Unauthorized)?;
404 let query = path_and_query.query().ok_or(RequestError::Unauthorized)?;
405
406 let params = url::form_urlencoded::parse(query.as_bytes());
407 let token = get_query_token(params).ok_or(RequestError::Unauthorized)?;
408
409 Ok(token.to_string())
410}
411
412fn get_query_token(pairs: url::form_urlencoded::Parse) -> Option<Cow<str>> {
413 for (key, value) in pairs {
414 if key == "token" {
415 return Some(value);
416 }
417 }
418
419 None
420}
421
422#[derive(Debug, PartialEq)]
424enum RequestType {
425 StoreQuery,
426 StoreMutation,
427}
428
429impl RequestType {
430 fn from_url_path(path: &str) -> Result<RequestType, RequestError> {
431 if path == "/store/query" {
432 Ok(RequestType::StoreQuery)
433 } else if path == "/store/mutate" {
434 Ok(RequestType::StoreMutation)
435 } else {
436 Err(RequestError::InvalidRequestType)
437 }
438 }
439
440 fn service_type(&self) -> ServiceType {
441 match self {
442 RequestType::StoreQuery => ServiceType::Store,
443 RequestType::StoreMutation => ServiceType::Store,
444 }
445 }
446}
447
448#[derive(Debug, thiserror::Error)]
450pub enum RequestError {
451 #[error("Invalid request type")]
452 InvalidRequestType,
453 #[error("Request unauthorized")]
454 Unauthorized,
455 #[error("Query error")]
456 Query,
457 #[error("Internal server error: {0}")]
458 Server(String),
459 #[error("Transport error: {0}")]
460 Transport(#[from] crate::Error),
461 #[error("Capnp serialization error: {0}")]
462 Serialization(#[from] capnp::Error),
463 #[error("Hyper error: {0}")]
464 Hyper(#[from] hyper::Error),
465}
466
467impl RequestError {
468 fn to_response(&self) -> Response<Body> {
469 let mut resp = Response::default();
470 let status = match self {
471 RequestError::InvalidRequestType => StatusCode::NOT_FOUND,
472 RequestError::Unauthorized => StatusCode::UNAUTHORIZED,
473 RequestError::Query => StatusCode::BAD_REQUEST,
474 _ => StatusCode::INTERNAL_SERVER_ERROR,
475 };
476
477 *resp.status_mut() = status;
478 resp
479 }
480}