bevy_remote/
http.rs

1//! The BRP transport using JSON-RPC over HTTP.
2//!
3//! Adding the [`RemoteHttpPlugin`] to your [`App`] causes Bevy to accept
4//! connections over HTTP (by default, on port 15702) while your app is running.
5//!
6//! Clients are expected to `POST` JSON requests to the root URL; see the `client`
7//! example for a trivial example of use.
8
9#![cfg(not(target_family = "wasm"))]
10
11use crate::{
12    error_codes, BrpBatch, BrpError, BrpMessage, BrpRequest, BrpResponse, BrpResult, BrpSender,
13};
14use anyhow::Result as AnyhowResult;
15use async_channel::{Receiver, Sender};
16use async_io::Async;
17use bevy_app::{App, Plugin, Startup};
18use bevy_ecs::resource::Resource;
19use bevy_ecs::system::Res;
20use bevy_tasks::{futures_lite::StreamExt, IoTaskPool};
21use core::{
22    convert::Infallible,
23    net::{IpAddr, Ipv4Addr},
24    pin::Pin,
25    task::{Context, Poll},
26};
27use http_body_util::{BodyExt as _, Full};
28use hyper::{
29    body::{Body, Bytes, Frame, Incoming},
30    header::{HeaderName, HeaderValue},
31    server::conn::http1,
32    service, Request, Response,
33};
34use serde_json::Value;
35use smol_hyper::rt::{FuturesIo, SmolTimer};
36use std::{
37    collections::HashMap,
38    net::{TcpListener, TcpStream},
39};
40
41/// The default port that Bevy will listen on.
42///
43/// This value was chosen randomly.
44pub const DEFAULT_PORT: u16 = 15702;
45
46/// The default host address that Bevy will use for its server.
47pub const DEFAULT_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
48
49/// A struct that holds a collection of HTTP headers.
50///
51/// This struct is used to store a set of HTTP headers as key-value pairs, where the keys are
52/// of type [`HeaderName`] and the values are of type [`HeaderValue`].
53///
54#[derive(Debug, Resource, Clone)]
55pub struct Headers {
56    headers: HashMap<HeaderName, HeaderValue>,
57}
58
59impl Headers {
60    /// Create a new instance of `Headers`.
61    pub fn new() -> Self {
62        Self {
63            headers: HashMap::default(),
64        }
65    }
66
67    /// Insert a key value pair to the `Headers` instance.
68    pub fn insert(
69        mut self,
70        name: impl TryInto<HeaderName>,
71        value: impl TryInto<HeaderValue>,
72    ) -> Self {
73        let Ok(header_name) = name.try_into() else {
74            panic!("Invalid header name")
75        };
76        let Ok(header_value) = value.try_into() else {
77            panic!("Invalid header value")
78        };
79        self.headers.insert(header_name, header_value);
80        self
81    }
82}
83
84impl Default for Headers {
85    fn default() -> Self {
86        Self::new()
87    }
88}
89
90/// Add this plugin to your [`App`] to allow remote connections over HTTP to inspect and modify entities.
91/// It requires the [`RemotePlugin`](super::RemotePlugin).
92///
93/// This BRP transport cannot be used when targeting WASM.
94///
95/// The defaults are:
96/// - [`DEFAULT_ADDR`] : 127.0.0.1.
97/// - [`DEFAULT_PORT`] : 15702.
98///
99pub struct RemoteHttpPlugin {
100    /// The address that Bevy will bind to.
101    address: IpAddr,
102    /// The port that Bevy will listen on.
103    port: u16,
104    /// The headers that Bevy will include in its HTTP responses
105    headers: Headers,
106}
107
108impl Default for RemoteHttpPlugin {
109    fn default() -> Self {
110        Self {
111            address: DEFAULT_ADDR,
112            port: DEFAULT_PORT,
113            headers: Headers::new(),
114        }
115    }
116}
117
118impl Plugin for RemoteHttpPlugin {
119    fn build(&self, app: &mut App) {
120        app.insert_resource(HostAddress(self.address))
121            .insert_resource(HostPort(self.port))
122            .insert_resource(HostHeaders(self.headers.clone()))
123            .add_systems(Startup, start_http_server);
124    }
125}
126
127impl RemoteHttpPlugin {
128    /// Set the IP address that the server will use.
129    #[must_use]
130    pub fn with_address(mut self, address: impl Into<IpAddr>) -> Self {
131        self.address = address.into();
132        self
133    }
134    /// Set the remote port that the server will listen on.
135    #[must_use]
136    pub fn with_port(mut self, port: u16) -> Self {
137        self.port = port;
138        self
139    }
140    /// Set the extra headers that the response will include.
141    ///
142    /// ////// /// # Example
143    ///
144    /// ```ignore
145    ///
146    /// // Create CORS headers
147    /// let cors_headers = Headers::new()
148    ///        .insert("Access-Control-Allow-Origin", "*")
149    ///        .insert("Access-Control-Allow-Headers", "Content-Type");
150    ///
151    /// // Create the Bevy app and add the RemoteHttpPlugin with CORS headers
152    /// fn main() {
153    ///     App::new()
154    ///     .add_plugins(DefaultPlugins)
155    ///     .add_plugins(RemotePlugin::default())
156    ///     .add_plugins(RemoteHttpPlugin::default()
157    ///         .with_headers(cors_headers))
158    ///     .run();
159    /// }
160    /// ```
161    #[must_use]
162    pub fn with_headers(mut self, headers: Headers) -> Self {
163        self.headers = headers;
164        self
165    }
166    /// Add a single header to the response headers.
167    #[must_use]
168    pub fn with_header(
169        mut self,
170        name: impl TryInto<HeaderName>,
171        value: impl TryInto<HeaderValue>,
172    ) -> Self {
173        self.headers = self.headers.insert(name, value);
174        self
175    }
176}
177
178/// A resource containing the IP address that Bevy will host on.
179///
180/// Currently, changing this while the application is running has no effect; this merely
181/// reflects the IP address that is set during the setup of the [`RemoteHttpPlugin`].
182#[derive(Debug, Resource)]
183pub struct HostAddress(pub IpAddr);
184
185/// A resource containing the port number that Bevy will listen on.
186///
187/// Currently, changing this while the application is running has no effect; this merely
188/// reflects the host that is set during the setup of the [`RemoteHttpPlugin`].
189#[derive(Debug, Resource)]
190pub struct HostPort(pub u16);
191
192/// A resource containing the headers that Bevy will include in its HTTP responses.
193///
194#[derive(Debug, Resource)]
195struct HostHeaders(pub Headers);
196
197/// A system that starts up the Bevy Remote Protocol HTTP server.
198fn start_http_server(
199    request_sender: Res<BrpSender>,
200    address: Res<HostAddress>,
201    remote_port: Res<HostPort>,
202    headers: Res<HostHeaders>,
203) {
204    IoTaskPool::get()
205        .spawn(server_main(
206            address.0,
207            remote_port.0,
208            request_sender.clone(),
209            headers.0.clone(),
210        ))
211        .detach();
212}
213
214/// The Bevy Remote Protocol server main loop.
215async fn server_main(
216    address: IpAddr,
217    port: u16,
218    request_sender: Sender<BrpMessage>,
219    headers: Headers,
220) -> AnyhowResult<()> {
221    listen(
222        Async::<TcpListener>::bind((address, port))?,
223        &request_sender,
224        &headers,
225    )
226    .await
227}
228
229async fn listen(
230    listener: Async<TcpListener>,
231    request_sender: &Sender<BrpMessage>,
232    headers: &Headers,
233) -> AnyhowResult<()> {
234    loop {
235        let (client, _) = listener.accept().await?;
236
237        let request_sender = request_sender.clone();
238        let headers = headers.clone();
239        IoTaskPool::get()
240            .spawn(async move {
241                let _ = handle_client(client, request_sender, headers).await;
242            })
243            .detach();
244    }
245}
246
247async fn handle_client(
248    client: Async<TcpStream>,
249    request_sender: Sender<BrpMessage>,
250    headers: Headers,
251) -> AnyhowResult<()> {
252    http1::Builder::new()
253        .timer(SmolTimer::new())
254        .serve_connection(
255            FuturesIo::new(client),
256            service::service_fn(|request| {
257                process_request_batch(request, &request_sender, &headers)
258            }),
259        )
260        .await?;
261
262    Ok(())
263}
264
265/// A helper function for the Bevy Remote Protocol server that handles a batch
266/// of requests coming from a client.
267async fn process_request_batch(
268    request: Request<Incoming>,
269    request_sender: &Sender<BrpMessage>,
270    headers: &Headers,
271) -> AnyhowResult<Response<BrpHttpBody>> {
272    let batch_bytes = request.into_body().collect().await?.to_bytes();
273    let batch: Result<BrpBatch, _> = serde_json::from_slice(&batch_bytes);
274
275    let result = match batch {
276        Ok(BrpBatch::Single(request)) => {
277            let response = process_single_request(request, request_sender).await?;
278            match response {
279                BrpHttpResponse::Complete(res) => {
280                    BrpHttpResponse::Complete(serde_json::to_string(&res)?)
281                }
282                BrpHttpResponse::Stream(stream) => BrpHttpResponse::Stream(stream),
283            }
284        }
285        Ok(BrpBatch::Batch(requests)) => {
286            let mut responses = Vec::new();
287
288            for request in requests {
289                let response = process_single_request(request, request_sender).await?;
290                match response {
291                    BrpHttpResponse::Complete(res) => responses.push(res),
292                    BrpHttpResponse::Stream(BrpStream { id, .. }) => {
293                        responses.push(BrpResponse::new(
294                            id,
295                            Err(BrpError {
296                                code: error_codes::INVALID_REQUEST,
297                                message: "Streaming can not be used in batch requests".to_string(),
298                                data: None,
299                            }),
300                        ));
301                    }
302                }
303            }
304
305            BrpHttpResponse::Complete(serde_json::to_string(&responses)?)
306        }
307        Err(err) => {
308            let err = BrpResponse::new(
309                None,
310                Err(BrpError {
311                    code: error_codes::INVALID_REQUEST,
312                    message: err.to_string(),
313                    data: None,
314                }),
315            );
316
317            BrpHttpResponse::Complete(serde_json::to_string(&err)?)
318        }
319    };
320
321    let mut response = match result {
322        BrpHttpResponse::Complete(serialized) => {
323            let mut response = Response::new(BrpHttpBody::Complete(Full::new(Bytes::from(
324                serialized.as_bytes().to_owned(),
325            ))));
326            response.headers_mut().insert(
327                hyper::header::CONTENT_TYPE,
328                HeaderValue::from_static("application/json"),
329            );
330            response
331        }
332        BrpHttpResponse::Stream(stream) => {
333            let mut response = Response::new(BrpHttpBody::Stream(stream));
334            response.headers_mut().insert(
335                hyper::header::CONTENT_TYPE,
336                HeaderValue::from_static("text/event-stream"),
337            );
338            response
339        }
340    };
341    for (key, value) in &headers.headers {
342        response.headers_mut().insert(key, value.clone());
343    }
344    Ok(response)
345}
346
347/// A helper function for the Bevy Remote Protocol server that processes a single
348/// request coming from a client.
349async fn process_single_request(
350    request: Value,
351    request_sender: &Sender<BrpMessage>,
352) -> AnyhowResult<BrpHttpResponse<BrpResponse, BrpStream>> {
353    // Reach in and get the request ID early so that we can report it even when parsing fails.
354    let id = request.as_object().and_then(|map| map.get("id")).cloned();
355
356    let request: BrpRequest = match serde_json::from_value(request) {
357        Ok(v) => v,
358        Err(err) => {
359            return Ok(BrpHttpResponse::Complete(BrpResponse::new(
360                id,
361                Err(BrpError {
362                    code: error_codes::INVALID_REQUEST,
363                    message: err.to_string(),
364                    data: None,
365                }),
366            )));
367        }
368    };
369
370    if request.jsonrpc != "2.0" {
371        return Ok(BrpHttpResponse::Complete(BrpResponse::new(
372            id,
373            Err(BrpError {
374                code: error_codes::INVALID_REQUEST,
375                message: String::from("JSON-RPC request requires `\"jsonrpc\": \"2.0\"`"),
376                data: None,
377            }),
378        )));
379    }
380
381    let watch = request.method.contains("+watch");
382    let size = if watch { 8 } else { 1 };
383    let (result_sender, result_receiver) = async_channel::bounded(size);
384
385    let _ = request_sender
386        .send(BrpMessage {
387            method: request.method,
388            params: request.params,
389            sender: result_sender,
390        })
391        .await;
392
393    if watch {
394        Ok(BrpHttpResponse::Stream(BrpStream {
395            id: request.id,
396            rx: Box::pin(result_receiver),
397        }))
398    } else {
399        let result = result_receiver.recv().await?;
400        Ok(BrpHttpResponse::Complete(BrpResponse::new(
401            request.id, result,
402        )))
403    }
404}
405
406struct BrpStream {
407    id: Option<Value>,
408    rx: Pin<Box<Receiver<BrpResult>>>,
409}
410
411impl Body for BrpStream {
412    type Data = Bytes;
413    type Error = Infallible;
414
415    fn poll_frame(
416        mut self: Pin<&mut Self>,
417        cx: &mut Context<'_>,
418    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
419        match self.as_mut().rx.poll_next(cx) {
420            Poll::Ready(result) => match result {
421                Some(result) => {
422                    let response = BrpResponse::new(self.id.clone(), result);
423                    let serialized = serde_json::to_string(&response).unwrap();
424                    let bytes =
425                        Bytes::from(format!("data: {serialized}\n\n").as_bytes().to_owned());
426                    let frame = Frame::data(bytes);
427                    Poll::Ready(Some(Ok(frame)))
428                }
429                None => Poll::Ready(None),
430            },
431            Poll::Pending => Poll::Pending,
432        }
433    }
434
435    fn is_end_stream(&self) -> bool {
436        self.rx.is_closed()
437    }
438}
439
440enum BrpHttpResponse<C, S> {
441    Complete(C),
442    Stream(S),
443}
444
445enum BrpHttpBody {
446    Complete(Full<Bytes>),
447    Stream(BrpStream),
448}
449
450impl Body for BrpHttpBody {
451    type Data = Bytes;
452    type Error = Infallible;
453
454    fn poll_frame(
455        self: Pin<&mut Self>,
456        cx: &mut Context<'_>,
457    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
458        match &mut *self.get_mut() {
459            BrpHttpBody::Complete(body) => Body::poll_frame(Pin::new(body), cx),
460            BrpHttpBody::Stream(body) => Body::poll_frame(Pin::new(body), cx),
461        }
462    }
463}