bevy_mod_reqwest/
lib.rs

1use std::ops::{Deref, DerefMut};
2
3use bevy::{
4    ecs::system::{EntityCommands, IntoObserverSystem, SystemParam},
5    prelude::*,
6    tasks::IoTaskPool,
7};
8
9pub use reqwest;
10
11#[cfg(target_family = "wasm")]
12use crossbeam_channel::{bounded, Receiver};
13
14#[cfg(feature = "json")]
15pub use json::*;
16
17pub use reqwest::header::HeaderMap;
18pub use reqwest::{StatusCode, Version};
19
20#[cfg(not(target_family = "wasm"))]
21use {bevy::tasks::Task, futures_lite::future};
22
23/// The [`SystemSet`] that Reqwest systems are added to.
24#[derive(Debug, Hash, PartialEq, Eq, Clone, SystemSet)]
25pub struct ReqwestSet;
26
27/// Plugin that allows to send http request using the [reqwest](https://crates.io/crates/reqwest) library from
28/// inside bevy.
29///
30/// The plugin uses [`Observer`] systems to provide callbacks when the http requests finishes.
31///
32/// Supports both wasm and native.
33pub struct ReqwestPlugin {
34    /// this enables the plugin to insert a new [`Name`] component onto the entity used to drive
35    /// the http request to completion, if no such component already exists
36    pub automatically_name_requests: bool,
37}
38impl Default for ReqwestPlugin {
39    fn default() -> Self {
40        Self {
41            automatically_name_requests: true,
42        }
43    }
44}
45impl Plugin for ReqwestPlugin {
46    fn build(&self, app: &mut App) {
47        app.init_resource::<ReqwestClient>();
48
49        if self.automatically_name_requests {
50            // register a hook on the component to add a name to the entity if it doesnt have one already
51            app.world_mut()
52                .register_component_hooks::<ReqwestInflight>()
53                .on_insert(|mut world, ctx| {
54                    let url = world
55                        .get::<ReqwestInflight>(ctx.entity)
56                        .unwrap()
57                        .url
58                        .clone();
59
60                    if let None = world.get::<Name>(ctx.entity) {
61                        let mut commands = world.commands();
62                        let mut entity = commands.get_entity(ctx.entity).unwrap();
63                        entity.insert(Name::new(format!("http: {url}")));
64                    }
65                });
66        }
67        //
68        app.add_systems(
69            PreUpdate,
70            (
71                // These systems are chained, since the poll_inflight_requests will trigger the callback and mark the entity for deletion
72
73                // So if remove_finished_requests runs after poll_inflight_requests_to_bytes
74                // the entity will be removed before the callback is triggered.
75                Self::remove_finished_requests,
76                Self::poll_inflight_requests_to_bytes,
77            )
78                .chain()
79                .in_set(ReqwestSet),
80        );
81    }
82}
83
84//TODO: Make type generic, and we can create systems for JSON and TEXT requests
85impl ReqwestPlugin {
86    /// despawns finished reqwests if marked to be despawned and does not contain 'ReqwestInflight' component
87    fn remove_finished_requests(
88        mut commands: Commands,
89        q: Query<Entity, (With<DespawnReqwestEntity>, Without<ReqwestInflight>)>,
90    ) {
91        for e in q.iter() {
92            if let Ok(mut ec) = commands.get_entity(e) {
93                ec.despawn();
94            }
95        }
96    }
97
98    /// Polls any requests in flight to completion, and then removes the 'ReqwestInflight' component.
99    fn poll_inflight_requests_to_bytes(
100        mut commands: Commands,
101        mut requests: Query<(Entity, &mut ReqwestInflight)>,
102    ) {
103        for (entity, mut request) in requests.iter_mut() {
104            debug!("polling: {entity:?}");
105            if let Some((result, parts)) = request.poll() {
106                match result {
107                    Ok(body) => {
108                        // if the response is ok, the other values are already gotten, its safe to unwrap
109                        let parts = parts.unwrap();
110
111                        commands.trigger(ReqwestResponseEvent::new(
112                            entity,
113                            body.clone(),
114                            parts.status,
115                            parts.headers,
116                        ));
117                    }
118                    Err(err) => {
119                        commands.trigger(ReqwestErrorEvent { entity, error: err });
120                    }
121                }
122                if let Ok(mut ec) = commands.get_entity(entity) {
123                    ec.remove::<ReqwestInflight>();
124                }
125            }
126        }
127    }
128}
129
130/// Wrapper around EntityCommands to create the on_response and on_error
131pub struct BevyReqwestBuilder<'a>(EntityCommands<'a>);
132
133impl<'a> BevyReqwestBuilder<'a> {
134    /// Provide a system where the first argument is [`Trigger`] [`ReqwestResponseEvent`] that will run on the
135    /// response from the http request
136    ///
137    /// # Examples
138    ///
139    /// ```
140    /// use bevy::prelude::Trigger;
141    /// use bevy_mod_reqwest::ReqwestResponseEvent;
142    /// |trigger: Trigger<ReqwestResponseEvent>|  {
143    ///   bevy::log::info!("response: {:?}", trigger.event());
144    /// };
145    /// ```
146    pub fn on_response<RB: Bundle, RM, OR: IntoObserverSystem<ReqwestResponseEvent, RB, RM>>(
147        mut self,
148        onresponse: OR,
149    ) -> Self {
150        self.0.observe(onresponse);
151        self
152    }
153
154    /// Provide a system where the first argument is [`Trigger`] [`JsonResponse`] that will run on the
155    /// response from the http request, skipping some boilerplate of having to manually doing the JSON
156    /// parsing
157    ///
158    /// # Examples
159    /// ```
160    /// use bevy::prelude::Trigger;
161    /// use bevy_mod_reqwest::JsonResponse;
162    /// |trigger: Trigger<JsonResponse<T>>|  {
163    ///   bevy::log::info!("response: {:?}", trigger.event());
164    /// };
165    /// ```
166    #[cfg(feature = "json")]
167    pub fn on_json_response<
168        T: std::marker::Sync + std::marker::Send + serde::de::DeserializeOwned + 'static,
169        RB: Bundle,
170        RM,
171        OR: IntoObserverSystem<json::JsonResponse<T>, RB, RM>,
172    >(
173        mut self,
174        onresponse: OR,
175    ) -> Self {
176        self.0
177            .observe(|evt: On<ReqwestResponseEvent>, mut commands: Commands| {
178                let entity = evt.event().entity;
179                let evt = evt.event();
180                let data = evt.deserialize_json::<T>();
181
182                match data {
183                    Ok(data) => {
184                        // retrigger a new event with the serialized data
185                        commands.trigger(json::JsonResponse { entity, data });
186                    }
187                    Err(e) => {
188                        bevy::log::error!("deserialization error: {e}");
189                        bevy::log::debug!(
190                            "tried serializing: {}",
191                            evt.as_str().unwrap_or("failed getting event data")
192                        );
193                    }
194                }
195            });
196        self.0.observe(onresponse);
197        self
198    }
199
200    /// Provide a system where the first argument is [`Trigger`] [`ReqwestErrorEvent`] that will run on the
201    /// response from the http request
202    ///
203    /// # Examples
204    ///
205    /// ```
206    /// use bevy::prelude::Trigger;
207    /// use bevy_mod_reqwest::ReqwestErrorEvent;
208    /// |trigger: Trigger<ReqwestErrorEvent>|  {
209    ///   bevy::log::info!("response: {:?}", trigger.event());
210    /// };
211    /// ```
212    pub fn on_error<EB: Bundle, EM, OE: IntoObserverSystem<ReqwestErrorEvent, EB, EM>>(
213        mut self,
214        onerror: OE,
215    ) -> Self {
216        self.0.observe(onerror);
217        self
218    }
219}
220
221#[derive(SystemParam)]
222/// Systemparam to have a shorthand for creating http calls in systems
223pub struct BevyReqwest<'w, 's> {
224    commands: Commands<'w, 's>,
225    client: Res<'w, ReqwestClient>,
226}
227
228impl<'w, 's> BevyReqwest<'w, 's> {
229    /// Starts sending and processing the supplied [`reqwest::Request`]
230    /// then use the [`BevyReqwestBuilder`] to add handlers for responses and errors
231    pub fn send(&mut self, req: reqwest::Request) -> BevyReqwestBuilder {
232        let inflight = self.create_inflight_task(req);
233        BevyReqwestBuilder(self.commands.spawn((inflight, DespawnReqwestEntity)))
234    }
235
236    /// Starts sending and processing the supplied [`reqwest::Request`] on the supplied [`Entity`] if it exists
237    /// and then use the [`BevyReqwestBuilder`] to add handlers for responses and errors
238    pub fn send_using_entity(
239        &mut self,
240        entity: Entity,
241        req: reqwest::Request,
242    ) -> Result<BevyReqwestBuilder, Box<dyn std::error::Error>> {
243        let inflight = self.create_inflight_task(req);
244        let mut ec = self.commands.get_entity(entity)?;
245        info!("inserting request on entity: {:?}", entity);
246        ec.insert(inflight);
247        Ok(BevyReqwestBuilder(ec))
248    }
249
250    /// get access to the underlying ReqwestClient
251    pub fn client(&self) -> &reqwest::Client {
252        &self.client.0
253    }
254
255    fn create_inflight_task(&self, request: reqwest::Request) -> ReqwestInflight {
256        let thread_pool = IoTaskPool::get();
257        // bevy::log::debug!("Creating: {entity:?}");
258        // if we take the data, we can use it
259        let client = self.client.0.clone();
260        let url = request.url().to_string();
261
262        // wasm implementation
263        #[cfg(target_family = "wasm")]
264        let task = {
265            let (tx, task) = bounded(1);
266            thread_pool
267                .spawn(async move {
268                    let r = client.execute(request).await;
269                    let r = match r {
270                        Ok(res) => {
271                            let parts = Parts {
272                                status: res.status(),
273                                headers: res.headers().clone(),
274                            };
275                            (res.bytes().await, Some(parts))
276                        }
277                        Err(r) => (Err(r), None),
278                    };
279                    tx.send(r).ok();
280                })
281                .detach();
282            task
283        };
284
285        // otherwise
286        #[cfg(not(target_family = "wasm"))]
287        let task = {
288            thread_pool.spawn(async move {
289                let task_res = async_compat::Compat::new(async {
290                    let p = client.execute(request).await;
291                    match p {
292                        Ok(res) => {
293                            let parts = Parts {
294                                status: res.status(),
295                                headers: res.headers().clone(),
296                            };
297                            (res.bytes().await, Some(parts))
298                        }
299                        Err(e) => (Err(e), None),
300                    }
301                })
302                .await;
303                task_res
304            })
305        };
306        // put it as a component to be polled, and remove the request, it has been handled
307        ReqwestInflight::new(task, url)
308    }
309}
310
311impl<'w, 's> Deref for BevyReqwest<'w, 's> {
312    type Target = reqwest::Client;
313
314    fn deref(&self) -> &Self::Target {
315        self.client()
316    }
317}
318
319#[derive(Component)]
320/// Marker component that is used to despawn an entity if the reqwest is finshed
321pub struct DespawnReqwestEntity;
322
323#[derive(Resource)]
324/// Wrapper around the ReqwestClient, that when inserted as a resource will start connection pools towards
325/// the hosts, and also allows all the configuration from the ReqwestLibrary such as setting default headers etc
326/// to be used inside the bevy application
327pub struct ReqwestClient(pub reqwest::Client);
328impl Default for ReqwestClient {
329    fn default() -> Self {
330        Self(reqwest::Client::new())
331    }
332}
333
334impl std::ops::Deref for ReqwestClient {
335    type Target = reqwest::Client;
336    fn deref(&self) -> &Self::Target {
337        &self.0
338    }
339}
340impl DerefMut for ReqwestClient {
341    fn deref_mut(&mut self) -> &mut Self::Target {
342        &mut self.0
343    }
344}
345
346type Resp = (reqwest::Result<bytes::Bytes>, Option<Parts>);
347
348/// Dont touch these, its just to poll once every request, can be used to detect if there is an active request on the entity
349/// but should otherwise NOT be added/removed/changed by a user of this Crate
350#[derive(Component)]
351#[component(storage = "SparseSet")]
352pub struct ReqwestInflight {
353    // the url this request is handling as a string
354    pub(crate) url: String,
355    #[cfg(not(target_family = "wasm"))]
356    res: Task<Resp>,
357
358    #[cfg(target_family = "wasm")]
359    res: Receiver<Resp>,
360}
361
362impl ReqwestInflight {
363    fn poll(&mut self) -> Option<Resp> {
364        #[cfg(target_family = "wasm")]
365        if let Ok(v) = self.res.try_recv() {
366            Some(v)
367        } else {
368            None
369        }
370
371        #[cfg(not(target_family = "wasm"))]
372        if let Some(v) = future::block_on(future::poll_once(&mut self.res)) {
373            Some(v)
374        } else {
375            None
376        }
377    }
378
379    #[cfg(target_family = "wasm")]
380    pub(crate) fn new(res: Receiver<Resp>, url: String) -> Self {
381        Self { url, res }
382    }
383
384    #[cfg(not(target_family = "wasm"))]
385    pub(crate) fn new(res: Task<Resp>, url: String) -> Self {
386        Self { url, res }
387    }
388}
389
390#[derive(Component, Debug)]
391/// information about the response used to transfer headers between different stages in the async code
392struct Parts {
393    /// the `StatusCode`
394    pub(crate) status: StatusCode,
395
396    /// the headers of the response
397    pub(crate) headers: HeaderMap,
398}
399
400#[derive(Clone, EntityEvent, Debug)]
401/// the resulting data from a finished request is found here
402pub struct ReqwestResponseEvent {
403    entity: Entity,
404    bytes: bytes::Bytes,
405    status: StatusCode,
406    headers: HeaderMap,
407}
408
409#[derive(EntityEvent, Debug)]
410pub struct ReqwestErrorEvent {
411    pub entity: Entity,
412    pub error: reqwest::Error,
413}
414
415impl ReqwestResponseEvent {
416    /// retrieve a reference to the body of the response
417    #[inline]
418    pub fn body(&self) -> &bytes::Bytes {
419        &self.bytes
420    }
421
422    /// try to get the body of the response as_str
423    pub fn as_str(&self) -> anyhow::Result<&str> {
424        let s = std::str::from_utf8(&self.bytes)?;
425        Ok(s)
426    }
427    /// try to get the body of the response as an owned string
428    pub fn as_string(&self) -> anyhow::Result<String> {
429        Ok(self.as_str()?.to_string())
430    }
431    #[cfg(feature = "json")]
432    /// try to deserialize the body of the response using json
433    pub fn deserialize_json<'de, T: serde::Deserialize<'de>>(&'de self) -> anyhow::Result<T> {
434        Ok(serde_json::from_str(self.as_str()?)?)
435    }
436
437    #[cfg(feature = "msgpack")]
438    /// try to deserialize the body of the response using msgpack
439    pub fn deserialize_msgpack<'de, T: serde::Deserialize<'de>>(&'de self) -> anyhow::Result<T> {
440        Ok(rmp_serde::decode::from_slice(self.body())?)
441    }
442    #[inline]
443    /// Get the `StatusCode` of this `Response`.
444    pub fn status(&self) -> StatusCode {
445        self.status
446    }
447
448    #[inline]
449    /// Get the `Headers` of this `Response`.
450    pub fn response_headers(&self) -> &HeaderMap {
451        &self.headers
452    }
453}
454
455#[cfg(feature = "json")]
456pub mod json {
457    use bevy::{ecs::entity::Entity, prelude::EntityEvent};
458    #[derive(EntityEvent)]
459    pub struct JsonResponse<T> {
460        pub entity: Entity,
461        pub data: T,
462    }
463}
464
465impl ReqwestResponseEvent {
466    pub(crate) fn new(
467        entity: Entity,
468        bytes: bytes::Bytes,
469        status: StatusCode,
470        headers: HeaderMap,
471    ) -> Self {
472        Self {
473            entity,
474            bytes,
475            status,
476            headers,
477        }
478    }
479}