1#![cfg(not(target_family = "wasm"))]
10
11use crate::{
12 error_codes, BrpBatch, BrpError, BrpMessage, BrpRequest, BrpResponse, BrpResult, BrpSender,
13};
14use anyhow::Result as AnyhowResult;
15use async_channel::{Receiver, Sender};
16use async_io::Async;
17use bevy_app::{App, Plugin, Startup};
18use bevy_ecs::resource::Resource;
19use bevy_ecs::system::Res;
20use bevy_tasks::{futures_lite::StreamExt, IoTaskPool};
21use core::{
22 convert::Infallible,
23 net::{IpAddr, Ipv4Addr},
24 pin::Pin,
25 task::{Context, Poll},
26};
27use http_body_util::{BodyExt as _, Full};
28use hyper::{
29 body::{Body, Bytes, Frame, Incoming},
30 header::{HeaderName, HeaderValue},
31 server::conn::http1,
32 service, Request, Response,
33};
34use serde_json::Value;
35use smol_hyper::rt::{FuturesIo, SmolTimer};
36use std::{
37 collections::HashMap,
38 net::{TcpListener, TcpStream},
39};
40
41pub const DEFAULT_PORT: u16 = 15702;
45
46pub const DEFAULT_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
48
49#[derive(Debug, Resource, Clone)]
55pub struct Headers {
56 headers: HashMap<HeaderName, HeaderValue>,
57}
58
59impl Headers {
60 pub fn new() -> Self {
62 Self {
63 headers: HashMap::default(),
64 }
65 }
66
67 pub fn insert(
69 mut self,
70 name: impl TryInto<HeaderName>,
71 value: impl TryInto<HeaderValue>,
72 ) -> Self {
73 let Ok(header_name) = name.try_into() else {
74 panic!("Invalid header name")
75 };
76 let Ok(header_value) = value.try_into() else {
77 panic!("Invalid header value")
78 };
79 self.headers.insert(header_name, header_value);
80 self
81 }
82}
83
84impl Default for Headers {
85 fn default() -> Self {
86 Self::new()
87 }
88}
89
90pub struct RemoteHttpPlugin {
100 address: IpAddr,
102 port: u16,
104 headers: Headers,
106}
107
108impl Default for RemoteHttpPlugin {
109 fn default() -> Self {
110 Self {
111 address: DEFAULT_ADDR,
112 port: DEFAULT_PORT,
113 headers: Headers::new(),
114 }
115 }
116}
117
118impl Plugin for RemoteHttpPlugin {
119 fn build(&self, app: &mut App) {
120 app.insert_resource(HostAddress(self.address))
121 .insert_resource(HostPort(self.port))
122 .insert_resource(HostHeaders(self.headers.clone()))
123 .add_systems(Startup, start_http_server);
124 }
125}
126
127impl RemoteHttpPlugin {
128 #[must_use]
130 pub fn with_address(mut self, address: impl Into<IpAddr>) -> Self {
131 self.address = address.into();
132 self
133 }
134 #[must_use]
136 pub fn with_port(mut self, port: u16) -> Self {
137 self.port = port;
138 self
139 }
140 #[must_use]
162 pub fn with_headers(mut self, headers: Headers) -> Self {
163 self.headers = headers;
164 self
165 }
166 #[must_use]
168 pub fn with_header(
169 mut self,
170 name: impl TryInto<HeaderName>,
171 value: impl TryInto<HeaderValue>,
172 ) -> Self {
173 self.headers = self.headers.insert(name, value);
174 self
175 }
176}
177
178#[derive(Debug, Resource)]
183pub struct HostAddress(pub IpAddr);
184
185#[derive(Debug, Resource)]
190pub struct HostPort(pub u16);
191
192#[derive(Debug, Resource)]
195struct HostHeaders(pub Headers);
196
197fn start_http_server(
199 request_sender: Res<BrpSender>,
200 address: Res<HostAddress>,
201 remote_port: Res<HostPort>,
202 headers: Res<HostHeaders>,
203) {
204 IoTaskPool::get()
205 .spawn(server_main(
206 address.0,
207 remote_port.0,
208 request_sender.clone(),
209 headers.0.clone(),
210 ))
211 .detach();
212}
213
214async fn server_main(
216 address: IpAddr,
217 port: u16,
218 request_sender: Sender<BrpMessage>,
219 headers: Headers,
220) -> AnyhowResult<()> {
221 listen(
222 Async::<TcpListener>::bind((address, port))?,
223 &request_sender,
224 &headers,
225 )
226 .await
227}
228
229async fn listen(
230 listener: Async<TcpListener>,
231 request_sender: &Sender<BrpMessage>,
232 headers: &Headers,
233) -> AnyhowResult<()> {
234 loop {
235 let (client, _) = listener.accept().await?;
236
237 let request_sender = request_sender.clone();
238 let headers = headers.clone();
239 IoTaskPool::get()
240 .spawn(async move {
241 let _ = handle_client(client, request_sender, headers).await;
242 })
243 .detach();
244 }
245}
246
247async fn handle_client(
248 client: Async<TcpStream>,
249 request_sender: Sender<BrpMessage>,
250 headers: Headers,
251) -> AnyhowResult<()> {
252 http1::Builder::new()
253 .timer(SmolTimer::new())
254 .serve_connection(
255 FuturesIo::new(client),
256 service::service_fn(|request| {
257 process_request_batch(request, &request_sender, &headers)
258 }),
259 )
260 .await?;
261
262 Ok(())
263}
264
265async fn process_request_batch(
268 request: Request<Incoming>,
269 request_sender: &Sender<BrpMessage>,
270 headers: &Headers,
271) -> AnyhowResult<Response<BrpHttpBody>> {
272 let batch_bytes = request.into_body().collect().await?.to_bytes();
273 let batch: Result<BrpBatch, _> = serde_json::from_slice(&batch_bytes);
274
275 let result = match batch {
276 Ok(BrpBatch::Single(request)) => {
277 let response = process_single_request(request, request_sender).await?;
278 match response {
279 BrpHttpResponse::Complete(res) => {
280 BrpHttpResponse::Complete(serde_json::to_string(&res)?)
281 }
282 BrpHttpResponse::Stream(stream) => BrpHttpResponse::Stream(stream),
283 }
284 }
285 Ok(BrpBatch::Batch(requests)) => {
286 let mut responses = Vec::new();
287
288 for request in requests {
289 let response = process_single_request(request, request_sender).await?;
290 match response {
291 BrpHttpResponse::Complete(res) => responses.push(res),
292 BrpHttpResponse::Stream(BrpStream { id, .. }) => {
293 responses.push(BrpResponse::new(
294 id,
295 Err(BrpError {
296 code: error_codes::INVALID_REQUEST,
297 message: "Streaming can not be used in batch requests".to_string(),
298 data: None,
299 }),
300 ));
301 }
302 }
303 }
304
305 BrpHttpResponse::Complete(serde_json::to_string(&responses)?)
306 }
307 Err(err) => {
308 let err = BrpResponse::new(
309 None,
310 Err(BrpError {
311 code: error_codes::INVALID_REQUEST,
312 message: err.to_string(),
313 data: None,
314 }),
315 );
316
317 BrpHttpResponse::Complete(serde_json::to_string(&err)?)
318 }
319 };
320
321 let mut response = match result {
322 BrpHttpResponse::Complete(serialized) => {
323 let mut response = Response::new(BrpHttpBody::Complete(Full::new(Bytes::from(
324 serialized.as_bytes().to_owned(),
325 ))));
326 response.headers_mut().insert(
327 hyper::header::CONTENT_TYPE,
328 HeaderValue::from_static("application/json"),
329 );
330 response
331 }
332 BrpHttpResponse::Stream(stream) => {
333 let mut response = Response::new(BrpHttpBody::Stream(stream));
334 response.headers_mut().insert(
335 hyper::header::CONTENT_TYPE,
336 HeaderValue::from_static("text/event-stream"),
337 );
338 response
339 }
340 };
341 for (key, value) in &headers.headers {
342 response.headers_mut().insert(key, value.clone());
343 }
344 Ok(response)
345}
346
347async fn process_single_request(
350 request: Value,
351 request_sender: &Sender<BrpMessage>,
352) -> AnyhowResult<BrpHttpResponse<BrpResponse, BrpStream>> {
353 let id = request.as_object().and_then(|map| map.get("id")).cloned();
355
356 let request: BrpRequest = match serde_json::from_value(request) {
357 Ok(v) => v,
358 Err(err) => {
359 return Ok(BrpHttpResponse::Complete(BrpResponse::new(
360 id,
361 Err(BrpError {
362 code: error_codes::INVALID_REQUEST,
363 message: err.to_string(),
364 data: None,
365 }),
366 )));
367 }
368 };
369
370 if request.jsonrpc != "2.0" {
371 return Ok(BrpHttpResponse::Complete(BrpResponse::new(
372 id,
373 Err(BrpError {
374 code: error_codes::INVALID_REQUEST,
375 message: String::from("JSON-RPC request requires `\"jsonrpc\": \"2.0\"`"),
376 data: None,
377 }),
378 )));
379 }
380
381 let watch = request.method.contains("+watch");
382 let size = if watch { 8 } else { 1 };
383 let (result_sender, result_receiver) = async_channel::bounded(size);
384
385 let _ = request_sender
386 .send(BrpMessage {
387 method: request.method,
388 params: request.params,
389 sender: result_sender,
390 })
391 .await;
392
393 if watch {
394 Ok(BrpHttpResponse::Stream(BrpStream {
395 id: request.id,
396 rx: Box::pin(result_receiver),
397 }))
398 } else {
399 let result = result_receiver.recv().await?;
400 Ok(BrpHttpResponse::Complete(BrpResponse::new(
401 request.id, result,
402 )))
403 }
404}
405
406struct BrpStream {
407 id: Option<Value>,
408 rx: Pin<Box<Receiver<BrpResult>>>,
409}
410
411impl Body for BrpStream {
412 type Data = Bytes;
413 type Error = Infallible;
414
415 fn poll_frame(
416 mut self: Pin<&mut Self>,
417 cx: &mut Context<'_>,
418 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
419 match self.as_mut().rx.poll_next(cx) {
420 Poll::Ready(result) => match result {
421 Some(result) => {
422 let response = BrpResponse::new(self.id.clone(), result);
423 let serialized = serde_json::to_string(&response).unwrap();
424 let bytes =
425 Bytes::from(format!("data: {serialized}\n\n").as_bytes().to_owned());
426 let frame = Frame::data(bytes);
427 Poll::Ready(Some(Ok(frame)))
428 }
429 None => Poll::Ready(None),
430 },
431 Poll::Pending => Poll::Pending,
432 }
433 }
434
435 fn is_end_stream(&self) -> bool {
436 self.rx.is_closed()
437 }
438}
439
440enum BrpHttpResponse<C, S> {
441 Complete(C),
442 Stream(S),
443}
444
445enum BrpHttpBody {
446 Complete(Full<Bytes>),
447 Stream(BrpStream),
448}
449
450impl Body for BrpHttpBody {
451 type Data = Bytes;
452 type Error = Infallible;
453
454 fn poll_frame(
455 self: Pin<&mut Self>,
456 cx: &mut Context<'_>,
457 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
458 match &mut *self.get_mut() {
459 BrpHttpBody::Complete(body) => Body::poll_frame(Pin::new(body), cx),
460 BrpHttpBody::Stream(body) => Body::poll_frame(Pin::new(body), cx),
461 }
462 }
463}