1use std::ops::{Deref, DerefMut};
2
3use bevy::{
4 ecs::system::{EntityCommands, IntoObserverSystem, SystemParam},
5 prelude::*,
6 tasks::IoTaskPool,
7};
8
9pub use reqwest;
10
11#[cfg(target_family = "wasm")]
12use crossbeam_channel::{bounded, Receiver};
13
14#[cfg(feature = "json")]
15pub use json::*;
16
17pub use reqwest::header::HeaderMap;
18pub use reqwest::{StatusCode, Version};
19
20#[cfg(not(target_family = "wasm"))]
21use {bevy::tasks::Task, futures_lite::future};
22
23#[derive(Debug, Hash, PartialEq, Eq, Clone, SystemSet)]
25pub struct ReqwestSet;
26
27pub struct ReqwestPlugin {
34 pub automatically_name_requests: bool,
37}
38impl Default for ReqwestPlugin {
39 fn default() -> Self {
40 Self {
41 automatically_name_requests: true,
42 }
43 }
44}
45impl Plugin for ReqwestPlugin {
46 fn build(&self, app: &mut App) {
47 app.init_resource::<ReqwestClient>();
48
49 if self.automatically_name_requests {
50 app.world_mut()
52 .register_component_hooks::<ReqwestInflight>()
53 .on_insert(|mut world, ctx| {
54 let url = world
55 .get::<ReqwestInflight>(ctx.entity)
56 .unwrap()
57 .url
58 .clone();
59
60 if let None = world.get::<Name>(ctx.entity) {
61 let mut commands = world.commands();
62 let mut entity = commands.get_entity(ctx.entity).unwrap();
63 entity.insert(Name::new(format!("http: {url}")));
64 }
65 });
66 }
67 app.add_systems(
69 PreUpdate,
70 (
71 Self::remove_finished_requests,
76 Self::poll_inflight_requests_to_bytes,
77 )
78 .chain()
79 .in_set(ReqwestSet),
80 );
81 }
82}
83
84impl ReqwestPlugin {
86 fn remove_finished_requests(
88 mut commands: Commands,
89 q: Query<Entity, (With<DespawnReqwestEntity>, Without<ReqwestInflight>)>,
90 ) {
91 for e in q.iter() {
92 if let Ok(mut ec) = commands.get_entity(e) {
93 ec.despawn();
94 }
95 }
96 }
97
98 fn poll_inflight_requests_to_bytes(
100 mut commands: Commands,
101 mut requests: Query<(Entity, &mut ReqwestInflight)>,
102 ) {
103 for (entity, mut request) in requests.iter_mut() {
104 debug!("polling: {entity:?}");
105 if let Some((result, parts)) = request.poll() {
106 match result {
107 Ok(body) => {
108 let parts = parts.unwrap();
110
111 commands.trigger(ReqwestResponseEvent::new(
112 entity,
113 body.clone(),
114 parts.status,
115 parts.headers,
116 ));
117 }
118 Err(err) => {
119 commands.trigger(ReqwestErrorEvent { entity, error: err });
120 }
121 }
122 if let Ok(mut ec) = commands.get_entity(entity) {
123 ec.remove::<ReqwestInflight>();
124 }
125 }
126 }
127 }
128}
129
130pub struct BevyReqwestBuilder<'a>(EntityCommands<'a>);
132
133impl<'a> BevyReqwestBuilder<'a> {
134 pub fn on_response<RB: Bundle, RM, OR: IntoObserverSystem<ReqwestResponseEvent, RB, RM>>(
147 mut self,
148 onresponse: OR,
149 ) -> Self {
150 self.0.observe(onresponse);
151 self
152 }
153
154 #[cfg(feature = "json")]
167 pub fn on_json_response<
168 T: std::marker::Sync + std::marker::Send + serde::de::DeserializeOwned + 'static,
169 RB: Bundle,
170 RM,
171 OR: IntoObserverSystem<json::JsonResponse<T>, RB, RM>,
172 >(
173 mut self,
174 onresponse: OR,
175 ) -> Self {
176 self.0
177 .observe(|evt: On<ReqwestResponseEvent>, mut commands: Commands| {
178 let entity = evt.event().entity;
179 let evt = evt.event();
180 let data = evt.deserialize_json::<T>();
181
182 match data {
183 Ok(data) => {
184 commands.trigger(json::JsonResponse { entity, data });
186 }
187 Err(e) => {
188 bevy::log::error!("deserialization error: {e}");
189 bevy::log::debug!(
190 "tried serializing: {}",
191 evt.as_str().unwrap_or("failed getting event data")
192 );
193 }
194 }
195 });
196 self.0.observe(onresponse);
197 self
198 }
199
200 pub fn on_error<EB: Bundle, EM, OE: IntoObserverSystem<ReqwestErrorEvent, EB, EM>>(
213 mut self,
214 onerror: OE,
215 ) -> Self {
216 self.0.observe(onerror);
217 self
218 }
219}
220
221#[derive(SystemParam)]
222pub struct BevyReqwest<'w, 's> {
224 commands: Commands<'w, 's>,
225 client: Res<'w, ReqwestClient>,
226}
227
228impl<'w, 's> BevyReqwest<'w, 's> {
229 pub fn send(&mut self, req: reqwest::Request) -> BevyReqwestBuilder {
232 let inflight = self.create_inflight_task(req);
233 BevyReqwestBuilder(self.commands.spawn((inflight, DespawnReqwestEntity)))
234 }
235
236 pub fn send_using_entity(
239 &mut self,
240 entity: Entity,
241 req: reqwest::Request,
242 ) -> Result<BevyReqwestBuilder, Box<dyn std::error::Error>> {
243 let inflight = self.create_inflight_task(req);
244 let mut ec = self.commands.get_entity(entity)?;
245 info!("inserting request on entity: {:?}", entity);
246 ec.insert(inflight);
247 Ok(BevyReqwestBuilder(ec))
248 }
249
250 pub fn client(&self) -> &reqwest::Client {
252 &self.client.0
253 }
254
255 fn create_inflight_task(&self, request: reqwest::Request) -> ReqwestInflight {
256 let thread_pool = IoTaskPool::get();
257 let client = self.client.0.clone();
260 let url = request.url().to_string();
261
262 #[cfg(target_family = "wasm")]
264 let task = {
265 let (tx, task) = bounded(1);
266 thread_pool
267 .spawn(async move {
268 let r = client.execute(request).await;
269 let r = match r {
270 Ok(res) => {
271 let parts = Parts {
272 status: res.status(),
273 headers: res.headers().clone(),
274 };
275 (res.bytes().await, Some(parts))
276 }
277 Err(r) => (Err(r), None),
278 };
279 tx.send(r).ok();
280 })
281 .detach();
282 task
283 };
284
285 #[cfg(not(target_family = "wasm"))]
287 let task = {
288 thread_pool.spawn(async move {
289 let task_res = async_compat::Compat::new(async {
290 let p = client.execute(request).await;
291 match p {
292 Ok(res) => {
293 let parts = Parts {
294 status: res.status(),
295 headers: res.headers().clone(),
296 };
297 (res.bytes().await, Some(parts))
298 }
299 Err(e) => (Err(e), None),
300 }
301 })
302 .await;
303 task_res
304 })
305 };
306 ReqwestInflight::new(task, url)
308 }
309}
310
311impl<'w, 's> Deref for BevyReqwest<'w, 's> {
312 type Target = reqwest::Client;
313
314 fn deref(&self) -> &Self::Target {
315 self.client()
316 }
317}
318
319#[derive(Component)]
320pub struct DespawnReqwestEntity;
322
323#[derive(Resource)]
324pub struct ReqwestClient(pub reqwest::Client);
328impl Default for ReqwestClient {
329 fn default() -> Self {
330 Self(reqwest::Client::new())
331 }
332}
333
334impl std::ops::Deref for ReqwestClient {
335 type Target = reqwest::Client;
336 fn deref(&self) -> &Self::Target {
337 &self.0
338 }
339}
340impl DerefMut for ReqwestClient {
341 fn deref_mut(&mut self) -> &mut Self::Target {
342 &mut self.0
343 }
344}
345
346type Resp = (reqwest::Result<bytes::Bytes>, Option<Parts>);
347
348#[derive(Component)]
351#[component(storage = "SparseSet")]
352pub struct ReqwestInflight {
353 pub(crate) url: String,
355 #[cfg(not(target_family = "wasm"))]
356 res: Task<Resp>,
357
358 #[cfg(target_family = "wasm")]
359 res: Receiver<Resp>,
360}
361
362impl ReqwestInflight {
363 fn poll(&mut self) -> Option<Resp> {
364 #[cfg(target_family = "wasm")]
365 if let Ok(v) = self.res.try_recv() {
366 Some(v)
367 } else {
368 None
369 }
370
371 #[cfg(not(target_family = "wasm"))]
372 if let Some(v) = future::block_on(future::poll_once(&mut self.res)) {
373 Some(v)
374 } else {
375 None
376 }
377 }
378
379 #[cfg(target_family = "wasm")]
380 pub(crate) fn new(res: Receiver<Resp>, url: String) -> Self {
381 Self { url, res }
382 }
383
384 #[cfg(not(target_family = "wasm"))]
385 pub(crate) fn new(res: Task<Resp>, url: String) -> Self {
386 Self { url, res }
387 }
388}
389
390#[derive(Component, Debug)]
391struct Parts {
393 pub(crate) status: StatusCode,
395
396 pub(crate) headers: HeaderMap,
398}
399
400#[derive(Clone, EntityEvent, Debug)]
401pub struct ReqwestResponseEvent {
403 entity: Entity,
404 bytes: bytes::Bytes,
405 status: StatusCode,
406 headers: HeaderMap,
407}
408
409#[derive(EntityEvent, Debug)]
410pub struct ReqwestErrorEvent {
411 pub entity: Entity,
412 pub error: reqwest::Error,
413}
414
415impl ReqwestResponseEvent {
416 #[inline]
418 pub fn body(&self) -> &bytes::Bytes {
419 &self.bytes
420 }
421
422 pub fn as_str(&self) -> anyhow::Result<&str> {
424 let s = std::str::from_utf8(&self.bytes)?;
425 Ok(s)
426 }
427 pub fn as_string(&self) -> anyhow::Result<String> {
429 Ok(self.as_str()?.to_string())
430 }
431 #[cfg(feature = "json")]
432 pub fn deserialize_json<'de, T: serde::Deserialize<'de>>(&'de self) -> anyhow::Result<T> {
434 Ok(serde_json::from_str(self.as_str()?)?)
435 }
436
437 #[cfg(feature = "msgpack")]
438 pub fn deserialize_msgpack<'de, T: serde::Deserialize<'de>>(&'de self) -> anyhow::Result<T> {
440 Ok(rmp_serde::decode::from_slice(self.body())?)
441 }
442 #[inline]
443 pub fn status(&self) -> StatusCode {
445 self.status
446 }
447
448 #[inline]
449 pub fn response_headers(&self) -> &HeaderMap {
451 &self.headers
452 }
453}
454
455#[cfg(feature = "json")]
456pub mod json {
457 use bevy::{ecs::entity::Entity, prelude::EntityEvent};
458 #[derive(EntityEvent)]
459 pub struct JsonResponse<T> {
460 pub entity: Entity,
461 pub data: T,
462 }
463}
464
465impl ReqwestResponseEvent {
466 pub(crate) fn new(
467 entity: Entity,
468 bytes: bytes::Bytes,
469 status: StatusCode,
470 headers: HeaderMap,
471 ) -> Self {
472 Self {
473 entity,
474 bytes,
475 status,
476 headers,
477 }
478 }
479}