use std::ops::{Deref, DerefMut};
use bevy::{
ecs::system::{EntityCommands, IntoObserverSystem, SystemParam},
prelude::*,
tasks::IoTaskPool,
};
pub use reqwest;
#[cfg(target_family = "wasm")]
use crossbeam_channel::{bounded, Receiver};
#[cfg(feature = "json")]
pub use json::*;
pub use reqwest::header::HeaderMap;
pub use reqwest::{StatusCode, Version};
#[cfg(not(target_family = "wasm"))]
use {bevy::tasks::Task, futures_lite::future};
#[derive(Debug, Hash, PartialEq, Eq, Clone, SystemSet)]
pub struct ReqwestSet;
pub struct ReqwestPlugin {
pub automatically_name_requests: bool,
}
impl Default for ReqwestPlugin {
fn default() -> Self {
Self {
automatically_name_requests: true,
}
}
}
impl Plugin for ReqwestPlugin {
fn build(&self, app: &mut App) {
app.init_resource::<ReqwestClient>();
if self.automatically_name_requests {
app.world_mut()
.register_component_hooks::<ReqwestInflight>()
.on_insert(|mut world, ctx| {
let url = world
.get::<ReqwestInflight>(ctx.entity)
.unwrap()
.url
.clone();
if world.get::<Name>(ctx.entity).is_none() {
let mut commands = world.commands();
let mut entity = commands.get_entity(ctx.entity).unwrap();
entity.insert(Name::new(format!("http: {url}")));
}
});
}
app.add_systems(
PreUpdate,
(
Self::remove_finished_requests,
Self::poll_inflight_requests_to_bytes,
)
.chain()
.in_set(ReqwestSet),
);
}
}
impl ReqwestPlugin {
fn remove_finished_requests(
mut commands: Commands,
q: Query<Entity, (With<DespawnReqwestEntity>, Without<ReqwestInflight>)>,
) {
for e in q.iter() {
if let Ok(mut ec) = commands.get_entity(e) {
ec.despawn();
}
}
}
fn poll_inflight_requests_to_bytes(
mut commands: Commands,
mut requests: Query<(Entity, &mut ReqwestInflight)>,
) {
for (entity, mut request) in requests.iter_mut() {
debug!("polling: {entity:?}");
if let Some((result, parts)) = request.poll() {
match result {
Ok(body) => {
let parts = parts.unwrap();
commands.trigger(ReqwestResponseEvent::new(
entity,
body.clone(),
parts.status,
parts.headers,
));
}
Err(err) => {
commands.trigger(ReqwestErrorEvent { entity, error: err });
}
}
if let Ok(mut ec) = commands.get_entity(entity) {
ec.remove::<ReqwestInflight>();
}
}
}
}
}
pub struct BevyReqwestBuilder<'a>(EntityCommands<'a>);
impl<'a> BevyReqwestBuilder<'a> {
pub fn on_response<RB: Bundle, RM, OR: IntoObserverSystem<ReqwestResponseEvent, RB, RM>>(
mut self,
onresponse: OR,
) -> Self {
self.0.observe(onresponse);
self
}
#[cfg(feature = "json")]
pub fn on_json_response<
T: std::marker::Sync + std::marker::Send + serde::de::DeserializeOwned + 'static,
RB: Bundle,
RM,
OR: IntoObserverSystem<json::JsonResponse<T>, RB, RM>,
>(
mut self,
onresponse: OR,
) -> Self {
self.0
.observe(|evt: On<ReqwestResponseEvent>, mut commands: Commands| {
let entity = evt.event().entity;
let evt = evt.event();
let data = evt.deserialize_json::<T>();
match data {
Ok(data) => {
commands.trigger(json::JsonResponse { entity, data });
}
Err(e) => {
bevy::log::error!("deserialization error: {e}");
bevy::log::debug!(
"tried serializing: {}",
evt.as_str().unwrap_or("failed getting event data")
);
}
}
});
self.0.observe(onresponse);
self
}
pub fn on_error<EB: Bundle, EM, OE: IntoObserverSystem<ReqwestErrorEvent, EB, EM>>(
mut self,
onerror: OE,
) -> Self {
self.0.observe(onerror);
self
}
}
#[derive(SystemParam)]
pub struct BevyReqwest<'w, 's> {
commands: Commands<'w, 's>,
client: Res<'w, ReqwestClient>,
}
impl<'w, 's> BevyReqwest<'w, 's> {
pub fn send(&mut self, req: reqwest::Request) -> BevyReqwestBuilder<'_> {
let inflight = self.create_inflight_task(req);
BevyReqwestBuilder(self.commands.spawn((inflight, DespawnReqwestEntity)))
}
pub fn send_using_entity(
&mut self,
entity: Entity,
req: reqwest::Request,
) -> Result<BevyReqwestBuilder<'_>, Box<dyn std::error::Error>> {
let inflight = self.create_inflight_task(req);
let mut ec = self.commands.get_entity(entity)?;
info!("inserting request on entity: {:?}", entity);
ec.insert(inflight);
Ok(BevyReqwestBuilder(ec))
}
pub fn client(&self) -> &reqwest::Client {
&self.client.0
}
fn create_inflight_task(&self, request: reqwest::Request) -> ReqwestInflight {
let thread_pool = IoTaskPool::get();
let client = self.client.0.clone();
let url = request.url().to_string();
#[cfg(target_family = "wasm")]
let task = {
let (tx, task) = bounded(1);
thread_pool
.spawn(async move {
let r = client.execute(request).await;
let r = match r {
Ok(res) => {
let parts = Parts {
status: res.status(),
headers: res.headers().clone(),
};
(res.bytes().await, Some(parts))
}
Err(r) => (Err(r), None),
};
tx.send(r).ok();
})
.detach();
task
};
#[cfg(not(target_family = "wasm"))]
let task = {
thread_pool.spawn(async move {
let task_res = async_compat::Compat::new(async {
let p = client.execute(request).await;
match p {
Ok(res) => {
let parts = Parts {
status: res.status(),
headers: res.headers().clone(),
};
(res.bytes().await, Some(parts))
}
Err(e) => (Err(e), None),
}
})
.await;
task_res
})
};
ReqwestInflight::new(task, url)
}
}
impl<'w, 's> Deref for BevyReqwest<'w, 's> {
type Target = reqwest::Client;
fn deref(&self) -> &Self::Target {
self.client()
}
}
#[derive(Component)]
pub struct DespawnReqwestEntity;
#[derive(Resource)]
pub struct ReqwestClient(pub reqwest::Client);
impl Default for ReqwestClient {
fn default() -> Self {
Self(reqwest::Client::new())
}
}
impl std::ops::Deref for ReqwestClient {
type Target = reqwest::Client;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for ReqwestClient {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
type Resp = (reqwest::Result<bytes::Bytes>, Option<Parts>);
#[derive(Component)]
#[component(storage = "SparseSet")]
pub struct ReqwestInflight {
pub(crate) url: String,
#[cfg(not(target_family = "wasm"))]
res: Task<Resp>,
#[cfg(target_family = "wasm")]
res: Receiver<Resp>,
}
impl ReqwestInflight {
fn poll(&mut self) -> Option<Resp> {
#[cfg(target_family = "wasm")]
{
self.res.try_recv().ok()
}
#[cfg(not(target_family = "wasm"))]
{
future::block_on(future::poll_once(&mut self.res)).map(|v| v)
}
}
#[cfg(target_family = "wasm")]
pub(crate) fn new(res: Receiver<Resp>, url: String) -> Self {
Self { url, res }
}
#[cfg(not(target_family = "wasm"))]
pub(crate) fn new(res: Task<Resp>, url: String) -> Self {
Self { url, res }
}
}
#[derive(Component, Debug)]
struct Parts {
pub(crate) status: StatusCode,
pub(crate) headers: HeaderMap,
}
#[derive(Clone, EntityEvent, Debug)]
pub struct ReqwestResponseEvent {
entity: Entity,
bytes: bytes::Bytes,
status: StatusCode,
headers: HeaderMap,
}
#[derive(EntityEvent, Debug)]
pub struct ReqwestErrorEvent {
pub entity: Entity,
pub error: reqwest::Error,
}
impl ReqwestResponseEvent {
#[inline]
pub fn body(&self) -> &bytes::Bytes {
&self.bytes
}
pub fn as_str(&self) -> anyhow::Result<&str> {
let s = std::str::from_utf8(&self.bytes)?;
Ok(s)
}
pub fn as_string(&self) -> anyhow::Result<String> {
Ok(self.as_str()?.to_string())
}
#[cfg(feature = "json")]
pub fn deserialize_json<'de, T: serde::Deserialize<'de>>(&'de self) -> anyhow::Result<T> {
Ok(serde_json::from_str(self.as_str()?)?)
}
#[cfg(feature = "msgpack")]
pub fn deserialize_msgpack<'de, T: serde::Deserialize<'de>>(&'de self) -> anyhow::Result<T> {
Ok(rmp_serde::decode::from_slice(self.body())?)
}
#[inline]
pub fn status(&self) -> StatusCode {
self.status
}
#[inline]
pub fn response_headers(&self) -> &HeaderMap {
&self.headers
}
}
#[cfg(feature = "json")]
pub mod json {
use bevy::{ecs::entity::Entity, prelude::EntityEvent};
#[derive(EntityEvent)]
pub struct JsonResponse<T> {
pub entity: Entity,
pub data: T,
}
}
impl ReqwestResponseEvent {
pub(crate) fn new(
entity: Entity,
bytes: bytes::Bytes,
status: StatusCode,
headers: HeaderMap,
) -> Self {
Self {
entity,
bytes,
status,
headers,
}
}
}