cfx_core/
events.rs

1use futures::{
2    channel::mpsc::{unbounded, UnboundedSender},
3    Future, Stream, StreamExt,
4};
5
6use serde::{de::DeserializeOwned, Deserialize, Serialize};
7use std::{borrow::Cow, cell::RefCell, ffi::CStr, rc::Rc};
8
9use rustc_hash::FxHashMap;
10
11use crate::invoker::Val;
12
13/// A raw event contains bytes from the emitters.
14#[derive(Debug)]
15pub struct RawEvent {
16    /// A source who triggered an event
17    pub source: String,
18    /// Payload of an event
19    pub payload: Vec<u8>,
20}
21
22pub struct RawEventRef<'a> {
23    source: Cow<'a, str>,
24    payload: &'a [u8],
25}
26
27impl<'a> RawEventRef<'a> {
28    fn to_raw_event(&self) -> RawEvent {
29        RawEvent {
30            source: self.source.to_string(),
31            payload: self.payload.into(),
32        }
33    }
34}
35
36struct EventSub {
37    scope: EventScope,
38    handler: EventHandler,
39}
40
41enum EventHandler {
42    Future(UnboundedSender<RawEvent>),
43    Function(Box<dyn Fn(RawEventRef) + 'static>),
44}
45
46// TODO: Vec<UnboundedSender<Event>>
47thread_local! {
48    static EVENTS: RefCell<FxHashMap<String, EventSub>> = RefCell::new(FxHashMap::default());
49}
50
51#[doc(hidden)]
52#[no_mangle]
53pub unsafe extern "C" fn __cfx_on_event(
54    cstring: *const i8,
55    args: *const u8,
56    args_length: u32,
57    source: *const i8,
58) {
59    let name = CStr::from_ptr(cstring).to_str().unwrap();
60    let payload = std::slice::from_raw_parts(args, args_length as _);
61    let source = CStr::from_ptr(source).to_str().unwrap();
62
63    EVENTS.with(|events| {
64        let events = events.borrow();
65
66        if let Some(sub) = events.get(name) {
67            let source = if source.starts_with("net:") {
68                if sub.scope != EventScope::Network {
69                    return;
70                }
71
72                Cow::from(source.strip_prefix("net:").unwrap())
73            } else if
74            /* is_duplicity_version && */
75            source.starts_with("internal-net:") {
76                Cow::from(source.strip_prefix("internal-net:").unwrap())
77            } else {
78                Cow::from("")
79            };
80
81            let event = RawEventRef { source, payload };
82
83            match sub.handler {
84                EventHandler::Function(ref func) => {
85                    func(event);
86                }
87
88                EventHandler::Future(ref sender) => {
89                    let _ = sender.unbounded_send(event.to_raw_event());
90                }
91            }
92        }
93    });
94
95    crate::runtime::LOCAL_POOL.with(|lp| {
96        if let Ok(mut lp) = lp.try_borrow_mut() {
97            lp.run_until_stalled();
98        }
99    });
100}
101
102/// Same as [`Event`] but without clone and allocations (for [`set_event_handler`]).
103pub struct Event<'de, T: Deserialize<'de>> {
104    source: Cow<'de, str>,
105    payload: T,
106}
107
108impl<'de, T: Deserialize<'de>> Event<'de, T> {
109    /// Get a source that triggered that event.
110    pub fn source(&self) -> &str {
111        &self.source
112    }
113
114    /// Payload of an event
115    pub fn payload(&self) -> &T {
116        &self.payload
117    }
118
119    /// Consumes `Event<T>` and returns inner `T`.
120    pub fn into_inner(self) -> T {
121        self.payload
122    }
123}
124
125pub struct EventOwned<T: DeserializeOwned> {
126    source: String,
127    payload: T,
128}
129
130#[derive(Debug, Clone, Copy, PartialEq, Eq)]
131pub enum EventScope {
132    Local,
133    Network,
134}
135
136/// Subscribes on an event with the given name.
137///
138/// Every time that an event is triggered this function decodes a raw message using messagepack.
139///
140/// # Example
141/// ```rust,ignore
142/// # use std::error::Error;
143/// # fn main() -> Result<(), Box<dyn Error>> {
144/// struct GiveMoney {
145///     to: String, // player name
146///     amount: u32 // amount of money
147/// }
148///
149/// let events = fivem::events::subscribe::<GiveMoney>("myCustomEvent");
150///
151/// while let Some(event) = events.next().await {
152///     let source = event.source.clone();
153///     let name_from_src = ...;
154///     let data = event.into_inner(); // consume an event and take GiveMoney
155///     let msg = format!("{} wants to give {} ${}", name_from_src, data.to, data.amount);
156/// }
157/// # Ok(())
158/// # }
159///
160pub fn subscribe<'a, In>(event_name: &'a str, scope: EventScope) -> impl Stream<Item = Event<In>>
161where
162    for<'de> In: Deserialize<'de> + 'a,
163{
164    let mut events = subscribe_raw(event_name, scope);
165
166    async_stream::stream! {
167        while let Some(event) = events.next().await {
168            if let Ok(payload) = rmp_serde::from_read_ref(&event.payload) {
169                let event = Event {
170                    source: Cow::from(event.source),
171                    payload,
172                };
173
174                yield event;
175            }
176        }
177    }
178}
179
180/// Same as [`subscribe`] but returns [`RawEvent`].
181pub fn subscribe_raw(event_name: &str, scope: EventScope) -> impl Stream<Item = RawEvent> {
182    let (tx, rx) = unbounded();
183
184    EVENTS.with(|events| {
185        let sub = EventSub {
186            scope,
187            handler: EventHandler::Future(tx),
188        };
189
190        let mut events = events.borrow_mut();
191        events.insert(event_name.to_owned(), sub);
192    });
193
194    let _ = crate::invoker::register_resource_as_event_handler(event_name);
195
196    rx
197}
198
199/// Sets an event handler.
200///
201/// The main difference between [`subscribe`] and [`set_event_handler_closure`] is that
202/// the last one calls the passed handler immediately after an event is triggered.
203///
204/// It is useful for events that contains [`crate::ref_funcs::ExternRefFunction`] to call it.
205/// Internaly this function is used in [`crate::exports::make_export`].
206pub fn set_event_handler_closure<In, Handler>(event_name: &str, handler: Handler, scope: EventScope)
207where
208    Handler: Fn(Event<In>) + 'static,
209    In: DeserializeOwned,
210{
211    let raw_handler = move |raw_event: RawEventRef| {
212        let RawEventRef {
213            source, payload, ..
214        } = raw_event;
215
216        let event = rmp_serde::from_read_ref::<_, In>(&payload).ok();
217
218        if let Some(payload) = event {
219            let event = Event { source, payload };
220
221            handler(event);
222        }
223    };
224
225    EVENTS.with(|events| {
226        let sub = EventSub {
227            scope,
228            handler: EventHandler::Function(Box::new(raw_handler)),
229        };
230
231        let mut events = events.borrow_mut();
232        events.insert(event_name.to_owned(), sub);
233    });
234
235    let _ = crate::invoker::register_resource_as_event_handler(event_name);
236}
237
238/// Emits a local event.
239pub fn emit<T: Serialize>(event_name: &str, payload: T) {
240    if let Ok(payload) = rmp_serde::to_vec_named(&payload) {
241        let args = &[
242            Val::String(event_name),
243            Val::Bytes(&payload),
244            Val::Integer(payload.len() as _),
245        ];
246
247        let _ = crate::invoker::invoke::<(), _>(0x91310870, args); // TRIGGER_EVENT_INTERNAL
248    }
249}
250
251pub trait Handler<Input: DeserializeOwned> {
252    type Response;
253    type Error;
254    type Future: Future<Output = Result<Self::Response, Self::Error>>;
255
256    fn handle(&mut self, source: String, event: Input) -> Self::Future;
257}
258
259pub fn set_event_handler<H, T>(event_name: &str, handler: H, scope: EventScope)
260where
261    H: Handler<T> + 'static,
262    T: DeserializeOwned + 'static,
263{
264    let handler = Rc::new(RefCell::new(handler));
265
266    let raw_handler = move |raw_event: RawEventRef| {
267        let RawEventRef {
268            source, payload, ..
269        } = raw_event;
270
271        let event = rmp_serde::from_read::<_, T>(payload).ok();
272
273        if let Some(payload) = event {
274            // let event = EventOwned {
275            //     source: source.to_owned().to_string(),
276            //     payload,
277            // };
278
279            let handler = handler.clone();
280            let source = source.to_string();
281
282            let _ = crate::runtime::spawn(async move {
283                let _ = handler.borrow_mut().handle(source, payload);
284            });
285        }
286    };
287
288    EVENTS.with(|events| {
289        let sub = EventSub {
290            scope,
291            handler: EventHandler::Function(Box::new(raw_handler)),
292        };
293
294        let mut events = events.borrow_mut();
295        events.insert(event_name.to_owned(), sub);
296    });
297
298    let _ = crate::invoker::register_resource_as_event_handler(event_name);
299}
300
301pub struct HandlerFn<T> {
302    func: T,
303}
304
305pub fn handler_fn<T>(func: T) -> HandlerFn<T> {
306    HandlerFn { func }
307}
308
309impl<T, F, Input, R, E> Handler<Input> for HandlerFn<T>
310where
311    T: FnMut(String, Input) -> F,
312    F: Future<Output = Result<R, E>>,
313    Input: DeserializeOwned,
314{
315    type Response = R;
316    type Error = E;
317    type Future = F;
318
319    fn handle(&mut self, source: String, event: Input) -> Self::Future {
320        (self.func)(source, event)
321    }
322}