extern crate alloc;
use async_channel::{Receiver, Sender};
use bevy_app::{prelude::*, MainScheduleOrder};
use bevy_derive::{Deref, DerefMut};
use bevy_ecs::{
entity::Entity,
observer::On,
resource::Resource,
schedule::{
InternedScheduleLabel, IntoScheduleConfigs, ScheduleBuildMetadata, ScheduleBuilt,
ScheduleLabel, SystemSet,
},
system::{Commands, In, IntoSystem, ResMut, System, SystemId},
world::World,
};
use bevy_platform::collections::HashMap;
#[cfg(feature = "bevy_render")]
use bevy_render::{Render, RenderApp, RenderScheduleOrder, RenderStartup};
use bevy_utils::prelude::default;
use serde::{ser::SerializeMap, Deserialize, Serialize};
use serde_json::Value;
use std::sync::RwLock;
pub mod builtin_methods;
#[cfg(feature = "http")]
pub mod http;
pub mod schemas;
const CHANNEL_SIZE: usize = 16;
pub struct RemotePlugin {
methods: RwLock<Vec<(String, RemoteMethodHandler)>>,
render_methods: RwLock<Vec<(String, RemoteMethodHandler)>>,
}
impl RemotePlugin {
fn empty() -> Self {
Self {
methods: RwLock::new(vec![]),
render_methods: RwLock::new(vec![]),
}
}
#[inline]
pub fn with_method_main<M>(
self,
name: impl Into<String>,
handler: impl IntoSystem<In<Option<Value>>, BrpResult, M>,
) -> Self {
self.with_method(name, handler, true)
}
#[inline]
pub fn with_method_render<M>(
self,
name: impl Into<String>,
handler: impl IntoSystem<In<Option<Value>>, BrpResult, M>,
) -> Self {
self.with_method(name, handler, false)
}
#[must_use]
fn with_method<M>(
mut self,
name: impl Into<String>,
handler: impl IntoSystem<In<Option<Value>>, BrpResult, M>,
to_main: bool,
) -> Self {
(if to_main {
self.methods.get_mut()
} else {
self.render_methods.get_mut()
})
.unwrap()
.push((
name.into(),
RemoteMethodHandler::Instant(Box::new(IntoSystem::into_system(handler))),
));
self
}
#[inline]
pub fn with_watching_method_main<M>(
self,
name: impl Into<String>,
handler: impl IntoSystem<In<Option<Value>>, BrpResult<Option<Value>>, M>,
) -> Self {
self.with_watching_method(name, handler, true)
}
#[inline]
pub fn with_watching_method_render<M>(
self,
name: impl Into<String>,
handler: impl IntoSystem<In<Option<Value>>, BrpResult<Option<Value>>, M>,
) -> Self {
self.with_watching_method(name, handler, false)
}
#[must_use]
fn with_watching_method<M>(
mut self,
name: impl Into<String>,
handler: impl IntoSystem<In<Option<Value>>, BrpResult<Option<Value>>, M>,
to_main: bool,
) -> Self {
(if to_main {
self.methods.get_mut()
} else {
self.render_methods.get_mut()
})
.unwrap()
.push((
name.into(),
RemoteMethodHandler::Watching(Box::new(IntoSystem::into_system(handler))),
));
self
}
fn add_default_methods(self, to_main: bool) -> Self {
self.with_method(
builtin_methods::BRP_GET_COMPONENTS_METHOD,
builtin_methods::process_remote_get_components_request,
to_main,
)
.with_method(
builtin_methods::BRP_QUERY_METHOD,
builtin_methods::process_remote_query_request,
to_main,
)
.with_method(
builtin_methods::BRP_SPAWN_ENTITY_METHOD,
builtin_methods::process_remote_spawn_entity_request,
to_main,
)
.with_method(
builtin_methods::BRP_INSERT_COMPONENTS_METHOD,
builtin_methods::process_remote_insert_components_request,
to_main,
)
.with_method(
builtin_methods::BRP_REMOVE_COMPONENTS_METHOD,
builtin_methods::process_remote_remove_components_request,
to_main,
)
.with_method(
builtin_methods::BRP_DESPAWN_COMPONENTS_METHOD,
builtin_methods::process_remote_despawn_entity_request,
to_main,
)
.with_method(
builtin_methods::BRP_REPARENT_ENTITIES_METHOD,
builtin_methods::process_remote_reparent_entities_request,
to_main,
)
.with_method(
builtin_methods::BRP_LIST_COMPONENTS_METHOD,
builtin_methods::process_remote_list_components_request,
to_main,
)
.with_method(
builtin_methods::BRP_MUTATE_COMPONENTS_METHOD,
builtin_methods::process_remote_mutate_components_request,
to_main,
)
.with_method(
builtin_methods::RPC_DISCOVER_METHOD,
builtin_methods::process_remote_list_methods_request,
to_main,
)
.with_watching_method(
builtin_methods::BRP_GET_COMPONENTS_AND_WATCH_METHOD,
builtin_methods::process_remote_get_components_watching_request,
to_main,
)
.with_watching_method(
builtin_methods::BRP_LIST_COMPONENTS_AND_WATCH_METHOD,
builtin_methods::process_remote_list_components_watching_request,
to_main,
)
.with_method(
builtin_methods::BRP_GET_RESOURCE_METHOD,
builtin_methods::process_remote_get_resources_request,
to_main,
)
.with_method(
builtin_methods::BRP_INSERT_RESOURCE_METHOD,
builtin_methods::process_remote_insert_resources_request,
to_main,
)
.with_method(
builtin_methods::BRP_REMOVE_RESOURCE_METHOD,
builtin_methods::process_remote_remove_resources_request,
to_main,
)
.with_method(
builtin_methods::BRP_MUTATE_RESOURCE_METHOD,
builtin_methods::process_remote_mutate_resources_request,
to_main,
)
.with_method(
builtin_methods::BRP_LIST_RESOURCES_METHOD,
builtin_methods::process_remote_list_resources_request,
to_main,
)
.with_method(
builtin_methods::BRP_TRIGGER_EVENT_METHOD,
builtin_methods::process_remote_trigger_event_request,
to_main,
)
.with_method(
builtin_methods::BRP_WRITE_MESSAGE_METHOD,
builtin_methods::process_remote_write_message_request,
to_main,
)
.with_watching_method(
builtin_methods::BRP_OBSERVE_METHOD,
builtin_methods::process_remote_observe_watching_request,
to_main,
)
.with_method(
builtin_methods::BRP_REGISTRY_SCHEMA_METHOD,
builtin_methods::export_registry_types,
to_main,
)
.with_method(
builtin_methods::BRP_SCHEDULE_LIST,
builtin_methods::schedule_list,
to_main,
)
.with_method(
builtin_methods::BRP_SCHEDULE_GRAPH,
builtin_methods::schedule_graph,
to_main,
)
}
}
impl Default for RemotePlugin {
fn default() -> Self {
let mut t = Self::empty();
t = t.add_default_methods(true);
#[cfg(feature = "bevy_render")]
{
t = t.add_default_methods(false);
}
t
}
}
impl Plugin for RemotePlugin {
fn build(&self, app: &mut App) {
let mut remote_methods = RemoteMethods::new();
let plugin_methods = &mut *self.methods.write().unwrap();
for (name, handler) in plugin_methods.drain(..) {
remote_methods.insert(
name.clone(),
match handler {
RemoteMethodHandler::Instant(system) => RemoteMethodSystemId::Instant(
app.main_mut().world_mut().register_boxed_system(system),
),
RemoteMethodHandler::Watching(system) => RemoteMethodSystemId::Watching(
app.main_mut().world_mut().register_boxed_system(system),
),
},
);
}
if remote_methods
.0
.contains_key(builtin_methods::BRP_SCHEDULE_GRAPH)
{
app.init_resource::<PreviousScheduleBuildMetadata>()
.add_observer(cache_schedule_build_metadata);
}
app.init_schedule(RemoteLast)
.world_mut()
.resource_mut::<MainScheduleOrder>()
.insert_after(Last, RemoteLast);
app.insert_resource(remote_methods)
.init_resource::<schemas::SchemaTypesMetadata>()
.init_resource::<RemoteWatchingRequests>()
.init_resource::<builtin_methods::BrpEventObservers>()
.add_systems(PreStartup, setup_mailbox_channel)
.configure_sets(
RemoteLast,
(RemoteSystems::ProcessRequests, RemoteSystems::Cleanup).chain(),
)
.add_systems(
RemoteLast,
(
(process_remote_requests, process_ongoing_watching_requests)
.chain()
.in_set(RemoteSystems::ProcessRequests),
remove_closed_watching_requests.in_set(RemoteSystems::Cleanup),
),
);
#[cfg(feature = "bevy_render")]
{
use bevy_ecs::schedule::common_conditions::run_once;
let Some(render_app) = app.get_sub_app_mut(RenderApp) else {
return;
};
let mut render_remote_methods = RemoteMethods::new();
let render_plugin_methods = &mut *self.render_methods.write().unwrap();
for (name, handler) in render_plugin_methods.drain(..) {
render_remote_methods.insert(
name,
match handler {
RemoteMethodHandler::Instant(system) => RemoteMethodSystemId::Instant(
render_app.world_mut().register_boxed_system(system),
),
RemoteMethodHandler::Watching(system) => RemoteMethodSystemId::Watching(
render_app.world_mut().register_boxed_system(system),
),
},
);
}
render_app
.init_schedule(RemoteLast)
.world_mut()
.resource_mut::<RenderScheduleOrder>()
.insert_after(Render, RemoteLast);
render_app
.insert_resource(render_remote_methods)
.init_resource::<schemas::SchemaTypesMetadata>()
.init_resource::<RemoteWatchingRequests>()
.add_systems(RenderStartup, setup_mailbox_channel.run_if(run_once))
.configure_sets(
RemoteLast,
(RemoteSystems::ProcessRequests, RemoteSystems::Cleanup).chain(),
)
.add_systems(
RemoteLast,
(
(process_remote_requests, process_ongoing_watching_requests)
.chain()
.in_set(RemoteSystems::ProcessRequests),
remove_closed_watching_requests.in_set(RemoteSystems::Cleanup),
),
);
}
}
}
#[derive(ScheduleLabel, Clone, Debug, PartialEq, Eq, Hash, Default)]
pub struct RemoteLast;
#[derive(Debug, Hash, PartialEq, Eq, Clone, SystemSet)]
pub enum RemoteSystems {
ProcessRequests,
Cleanup,
}
#[derive(Debug)]
pub enum RemoteMethodHandler {
Instant(Box<dyn System<In = In<Option<Value>>, Out = BrpResult>>),
Watching(Box<dyn System<In = In<Option<Value>>, Out = BrpResult<Option<Value>>>>),
}
pub type RemoteInstantMethodSystemId = SystemId<In<Option<Value>>, BrpResult>;
pub type RemoteWatchingMethodSystemId = SystemId<In<Option<Value>>, BrpResult<Option<Value>>>;
#[derive(Debug, Clone, Copy)]
pub enum RemoteMethodSystemId {
Instant(RemoteInstantMethodSystemId),
Watching(RemoteWatchingMethodSystemId),
}
#[derive(Debug, Resource, Default)]
pub struct RemoteMethods(HashMap<String, RemoteMethodSystemId>);
impl RemoteMethods {
pub fn new() -> Self {
default()
}
pub fn insert(
&mut self,
method_name: impl Into<String>,
handler: RemoteMethodSystemId,
) -> Option<RemoteMethodSystemId> {
self.0.insert(method_name.into(), handler)
}
pub fn get(&self, method: &str) -> Option<&RemoteMethodSystemId> {
self.0.get(method)
}
pub fn methods(&self) -> Vec<String> {
self.0.keys().cloned().collect()
}
}
#[derive(Debug, Resource, Default)]
pub struct RemoteWatchingRequests(Vec<(BrpMessage, RemoteWatchingMethodSystemId)>);
#[derive(Debug, Clone)]
pub struct BrpRequest {
pub method: String,
pub id: Option<Value>,
pub params: Option<Value>,
}
impl Serialize for BrpRequest {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("jsonrpc", "2.0")?;
map.serialize_entry("method", &self.method)?;
if self.id.is_some() {
map.serialize_entry("id", &self.id)?;
}
if self.params.is_some() {
map.serialize_entry("params", &self.params)?;
}
map.end()
}
}
impl<'de> Deserialize<'de> for BrpRequest {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::de::Deserializer<'de>,
{
use serde::de;
#[derive(Deserialize)]
#[serde(field_identifier, rename_all = "lowercase")]
enum Field {
JsonRpc,
Method,
Id,
Params,
}
struct Visitor;
impl<'de> de::Visitor<'de> for Visitor {
type Value = BrpRequest;
fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result {
formatter.write_str("struct BrpRequest")
}
fn visit_map<V>(self, mut map: V) -> Result<Self::Value, V::Error>
where
V: de::MapAccess<'de>,
{
let mut jsonrpc = false;
let mut method = None;
let mut id = None;
let mut params = None;
while let Some(key) = map.next_key()? {
match key {
Field::JsonRpc => {
let value = map.next_value::<String>()?;
if value != "2.0" {
return Err(de::Error::invalid_value(
de::Unexpected::Str(&value),
&"2.0",
));
}
if jsonrpc {
return Err(de::Error::duplicate_field("jsonrpc"));
}
jsonrpc = true;
}
Field::Method => {
if method.is_some() {
return Err(de::Error::duplicate_field("method"));
}
method = Some(map.next_value()?);
}
Field::Id => {
if id.is_some() {
return Err(de::Error::duplicate_field("id"));
}
id = Some(map.next_value()?);
}
Field::Params => {
if params.is_some() {
return Err(de::Error::duplicate_field("params"));
}
params = Some(map.next_value()?);
}
}
}
if !jsonrpc {
return Err(de::Error::missing_field("jsonrpc"));
}
let method = method.ok_or_else(|| de::Error::missing_field("method"))?;
let id = id.flatten();
let params = params.flatten();
Ok(BrpRequest { method, id, params })
}
}
deserializer.deserialize_map(Visitor)
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct BrpResponse {
pub jsonrpc: &'static str,
pub id: Option<Value>,
#[serde(flatten)]
pub payload: BrpPayload,
}
impl BrpResponse {
#[must_use]
pub fn new(id: Option<Value>, result: BrpResult) -> Self {
Self {
jsonrpc: "2.0",
id,
payload: BrpPayload::from(result),
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "snake_case")]
pub enum BrpPayload {
Result(Value),
Error(BrpError),
}
impl From<BrpResult> for BrpPayload {
fn from(value: BrpResult) -> Self {
match value {
Ok(v) => Self::Result(v),
Err(err) => Self::Error(err),
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct BrpError {
pub code: i16,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub data: Option<Value>,
}
impl BrpError {
#[must_use]
pub fn entity_not_found(entity: Entity) -> Self {
Self {
code: error_codes::ENTITY_NOT_FOUND,
message: format!("Entity {entity} not found"),
data: None,
}
}
#[must_use]
pub fn component_not_present(component: &str, entity: Entity) -> Self {
Self {
code: error_codes::COMPONENT_NOT_PRESENT,
message: format!("Component `{component}` not present in Entity {entity}"),
data: None,
}
}
#[must_use]
pub fn component_error<E: ToString>(error: E) -> Self {
Self {
code: error_codes::COMPONENT_ERROR,
message: error.to_string(),
data: None,
}
}
#[must_use]
pub fn resource_not_present(resource: &str) -> Self {
Self {
code: error_codes::RESOURCE_NOT_PRESENT,
message: format!("Resource `{resource}` not present in the world"),
data: None,
}
}
#[must_use]
pub fn resource_error<E: ToString>(error: E) -> Self {
Self {
code: error_codes::RESOURCE_ERROR,
message: error.to_string(),
data: None,
}
}
#[must_use]
pub fn internal<E: ToString>(error: E) -> Self {
Self {
code: error_codes::INTERNAL_ERROR,
message: error.to_string(),
data: None,
}
}
#[must_use]
pub fn self_reparent(entity: Entity) -> Self {
Self {
code: error_codes::SELF_REPARENT,
message: format!("Cannot reparent Entity {entity} to itself"),
data: None,
}
}
}
pub mod error_codes {
pub const PARSE_ERROR: i16 = -32700;
pub const INVALID_REQUEST: i16 = -32600;
pub const METHOD_NOT_FOUND: i16 = -32601;
pub const INVALID_PARAMS: i16 = -32602;
pub const INTERNAL_ERROR: i16 = -32603;
pub const ENTITY_NOT_FOUND: i16 = -23401;
pub const COMPONENT_ERROR: i16 = -23402;
pub const COMPONENT_NOT_PRESENT: i16 = -23403;
pub const SELF_REPARENT: i16 = -23404;
pub const RESOURCE_ERROR: i16 = -23501;
pub const RESOURCE_NOT_PRESENT: i16 = -23502;
}
pub type BrpResult<T = Value> = Result<T, BrpError>;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum BrpBatch {
Batch(Vec<Value>),
Single(Value),
}
#[derive(Debug, Clone)]
pub struct BrpMessage {
pub method: String,
pub params: Option<Value>,
pub sender: Sender<BrpResult>,
}
#[derive(Debug, Resource, Deref, DerefMut)]
pub struct BrpSender(Sender<BrpMessage>);
#[derive(Debug, Resource, Deref, DerefMut)]
pub struct BrpReceiver(Receiver<BrpMessage>);
fn setup_mailbox_channel(mut commands: Commands) {
let (request_sender, request_receiver) = async_channel::bounded(CHANNEL_SIZE);
commands.insert_resource(BrpSender(request_sender));
commands.insert_resource(BrpReceiver(request_receiver));
}
fn process_remote_requests(world: &mut World) {
if !world.contains_resource::<BrpReceiver>() {
return;
}
while let Ok(message) = world.resource_mut::<BrpReceiver>().try_recv() {
let Some(&handler) = world.resource::<RemoteMethods>().get(&message.method) else {
let _ = message.sender.force_send(Err(BrpError {
code: error_codes::METHOD_NOT_FOUND,
message: format!("Method `{}` not found", message.method),
data: None,
}));
return;
};
match handler {
RemoteMethodSystemId::Instant(id) => {
let result = match world.run_system_with(id, message.params) {
Ok(result) => result,
Err(error) => {
let _ = message.sender.force_send(Err(BrpError {
code: error_codes::INTERNAL_ERROR,
message: format!("Failed to run method handler: {error}"),
data: None,
}));
continue;
}
};
let _ = message.sender.force_send(result);
}
RemoteMethodSystemId::Watching(id) => {
world
.resource_mut::<RemoteWatchingRequests>()
.0
.push((message, id));
}
}
}
}
fn process_ongoing_watching_requests(world: &mut World) {
world.resource_scope::<RemoteWatchingRequests, ()>(|world, requests| {
for (message, system_id) in requests.0.iter() {
let handler_result = process_single_ongoing_watching_request(world, message, system_id);
let sender_result = match handler_result {
Ok(Some(value)) => message.sender.try_send(Ok(value)),
Err(err) => message.sender.try_send(Err(err)),
Ok(None) => continue,
};
if sender_result.is_err() {
message.sender.close();
}
}
});
}
fn process_single_ongoing_watching_request(
world: &mut World,
message: &BrpMessage,
system_id: &RemoteWatchingMethodSystemId,
) -> BrpResult<Option<Value>> {
world
.run_system_with(*system_id, message.params.clone())
.map_err(|error| BrpError {
code: error_codes::INTERNAL_ERROR,
message: format!("Failed to run method handler: {error}"),
data: None,
})?
}
fn remove_closed_watching_requests(mut requests: ResMut<RemoteWatchingRequests>) {
for i in (0..requests.0.len()).rev() {
let Some((message, _)) = requests.0.get(i) else {
unreachable!()
};
if message.sender.is_closed() {
requests.0.swap_remove(i);
}
}
}
#[derive(Resource, Default)]
struct PreviousScheduleBuildMetadata(HashMap<InternedScheduleLabel, ScheduleBuildMetadata>);
fn cache_schedule_build_metadata(
event: On<ScheduleBuilt>,
mut metadata: ResMut<PreviousScheduleBuildMetadata>,
) {
let new_metadata = ScheduleBuildMetadata {
warnings: vec![],
edges_added_by_build_passes: event.build_metadata.edges_added_by_build_passes.clone(),
};
metadata.0.insert(event.label, new_metadata);
}
#[cfg(test)]
mod tests {
use crate::BrpRequest;
use serde_json::json;
#[test]
fn deserialize_brp_request_params_optional() {
let request_json: &str = r#"{
"jsonrpc": "2.0",
"method": "world.list_components",
"id": 1
}"#;
let request: BrpRequest = serde_json::from_str(request_json).unwrap();
assert_eq!(request.method, "world.list_components");
assert_eq!(request.id, Some(json!(1)));
assert_eq!(request.params, None);
}
#[test]
fn deserialize_brp_request_id_optional() {
let request_json: &str = r#"{
"jsonrpc": "2.0",
"method": "world.list_components",
"params": {
"number": 5
}
}"#;
let request: BrpRequest = serde_json::from_str(request_json).unwrap();
assert_eq!(request.method, "world.list_components");
assert_eq!(request.id, None);
assert_eq!(request.params, Some(json!({ "number": 5 })));
}
}