1use futures::{
2 channel::mpsc::{unbounded, UnboundedSender},
3 Future, Stream, StreamExt,
4};
5
6use serde::{de::DeserializeOwned, Deserialize, Serialize};
7use std::{borrow::Cow, cell::RefCell, ffi::CStr, rc::Rc};
8
9use rustc_hash::FxHashMap;
10
11use crate::invoker::Val;
12
13#[derive(Debug)]
15pub struct RawEvent {
16 pub source: String,
18 pub payload: Vec<u8>,
20}
21
22pub struct RawEventRef<'a> {
23 source: Cow<'a, str>,
24 payload: &'a [u8],
25}
26
27impl<'a> RawEventRef<'a> {
28 fn to_raw_event(&self) -> RawEvent {
29 RawEvent {
30 source: self.source.to_string(),
31 payload: self.payload.into(),
32 }
33 }
34}
35
36struct EventSub {
37 scope: EventScope,
38 handler: EventHandler,
39}
40
41enum EventHandler {
42 Future(UnboundedSender<RawEvent>),
43 Function(Box<dyn Fn(RawEventRef) + 'static>),
44}
45
46thread_local! {
48 static EVENTS: RefCell<FxHashMap<String, EventSub>> = RefCell::new(FxHashMap::default());
49}
50
51#[doc(hidden)]
52#[no_mangle]
53pub unsafe extern "C" fn __cfx_on_event(
54 cstring: *const i8,
55 args: *const u8,
56 args_length: u32,
57 source: *const i8,
58) {
59 let name = CStr::from_ptr(cstring).to_str().unwrap();
60 let payload = std::slice::from_raw_parts(args, args_length as _);
61 let source = CStr::from_ptr(source).to_str().unwrap();
62
63 EVENTS.with(|events| {
64 let events = events.borrow();
65
66 if let Some(sub) = events.get(name) {
67 let source = if source.starts_with("net:") {
68 if sub.scope != EventScope::Network {
69 return;
70 }
71
72 Cow::from(source.strip_prefix("net:").unwrap())
73 } else if
74 source.starts_with("internal-net:") {
76 Cow::from(source.strip_prefix("internal-net:").unwrap())
77 } else {
78 Cow::from("")
79 };
80
81 let event = RawEventRef { source, payload };
82
83 match sub.handler {
84 EventHandler::Function(ref func) => {
85 func(event);
86 }
87
88 EventHandler::Future(ref sender) => {
89 let _ = sender.unbounded_send(event.to_raw_event());
90 }
91 }
92 }
93 });
94
95 crate::runtime::LOCAL_POOL.with(|lp| {
96 if let Ok(mut lp) = lp.try_borrow_mut() {
97 lp.run_until_stalled();
98 }
99 });
100}
101
102pub struct Event<'de, T: Deserialize<'de>> {
104 source: Cow<'de, str>,
105 payload: T,
106}
107
108impl<'de, T: Deserialize<'de>> Event<'de, T> {
109 pub fn source(&self) -> &str {
111 &self.source
112 }
113
114 pub fn payload(&self) -> &T {
116 &self.payload
117 }
118
119 pub fn into_inner(self) -> T {
121 self.payload
122 }
123}
124
125pub struct EventOwned<T: DeserializeOwned> {
126 source: String,
127 payload: T,
128}
129
130#[derive(Debug, Clone, Copy, PartialEq, Eq)]
131pub enum EventScope {
132 Local,
133 Network,
134}
135
136pub fn subscribe<'a, In>(event_name: &'a str, scope: EventScope) -> impl Stream<Item = Event<In>>
161where
162 for<'de> In: Deserialize<'de> + 'a,
163{
164 let mut events = subscribe_raw(event_name, scope);
165
166 async_stream::stream! {
167 while let Some(event) = events.next().await {
168 if let Ok(payload) = rmp_serde::from_read_ref(&event.payload) {
169 let event = Event {
170 source: Cow::from(event.source),
171 payload,
172 };
173
174 yield event;
175 }
176 }
177 }
178}
179
180pub fn subscribe_raw(event_name: &str, scope: EventScope) -> impl Stream<Item = RawEvent> {
182 let (tx, rx) = unbounded();
183
184 EVENTS.with(|events| {
185 let sub = EventSub {
186 scope,
187 handler: EventHandler::Future(tx),
188 };
189
190 let mut events = events.borrow_mut();
191 events.insert(event_name.to_owned(), sub);
192 });
193
194 let _ = crate::invoker::register_resource_as_event_handler(event_name);
195
196 rx
197}
198
199pub fn set_event_handler_closure<In, Handler>(event_name: &str, handler: Handler, scope: EventScope)
207where
208 Handler: Fn(Event<In>) + 'static,
209 In: DeserializeOwned,
210{
211 let raw_handler = move |raw_event: RawEventRef| {
212 let RawEventRef {
213 source, payload, ..
214 } = raw_event;
215
216 let event = rmp_serde::from_read_ref::<_, In>(&payload).ok();
217
218 if let Some(payload) = event {
219 let event = Event { source, payload };
220
221 handler(event);
222 }
223 };
224
225 EVENTS.with(|events| {
226 let sub = EventSub {
227 scope,
228 handler: EventHandler::Function(Box::new(raw_handler)),
229 };
230
231 let mut events = events.borrow_mut();
232 events.insert(event_name.to_owned(), sub);
233 });
234
235 let _ = crate::invoker::register_resource_as_event_handler(event_name);
236}
237
238pub fn emit<T: Serialize>(event_name: &str, payload: T) {
240 if let Ok(payload) = rmp_serde::to_vec_named(&payload) {
241 let args = &[
242 Val::String(event_name),
243 Val::Bytes(&payload),
244 Val::Integer(payload.len() as _),
245 ];
246
247 let _ = crate::invoker::invoke::<(), _>(0x91310870, args); }
249}
250
251pub trait Handler<Input: DeserializeOwned> {
252 type Response;
253 type Error;
254 type Future: Future<Output = Result<Self::Response, Self::Error>>;
255
256 fn handle(&mut self, source: String, event: Input) -> Self::Future;
257}
258
259pub fn set_event_handler<H, T>(event_name: &str, handler: H, scope: EventScope)
260where
261 H: Handler<T> + 'static,
262 T: DeserializeOwned + 'static,
263{
264 let handler = Rc::new(RefCell::new(handler));
265
266 let raw_handler = move |raw_event: RawEventRef| {
267 let RawEventRef {
268 source, payload, ..
269 } = raw_event;
270
271 let event = rmp_serde::from_read::<_, T>(payload).ok();
272
273 if let Some(payload) = event {
274 let handler = handler.clone();
280 let source = source.to_string();
281
282 let _ = crate::runtime::spawn(async move {
283 let _ = handler.borrow_mut().handle(source, payload);
284 });
285 }
286 };
287
288 EVENTS.with(|events| {
289 let sub = EventSub {
290 scope,
291 handler: EventHandler::Function(Box::new(raw_handler)),
292 };
293
294 let mut events = events.borrow_mut();
295 events.insert(event_name.to_owned(), sub);
296 });
297
298 let _ = crate::invoker::register_resource_as_event_handler(event_name);
299}
300
301pub struct HandlerFn<T> {
302 func: T,
303}
304
305pub fn handler_fn<T>(func: T) -> HandlerFn<T> {
306 HandlerFn { func }
307}
308
309impl<T, F, Input, R, E> Handler<Input> for HandlerFn<T>
310where
311 T: FnMut(String, Input) -> F,
312 F: Future<Output = Result<R, E>>,
313 Input: DeserializeOwned,
314{
315 type Response = R;
316 type Error = E;
317 type Future = F;
318
319 fn handle(&mut self, source: String, event: Input) -> Self::Future {
320 (self.func)(source, event)
321 }
322}