1use std::collections::HashMap;
8use std::panic;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::{Arc, Mutex, OnceLock};
11use tokio::sync::{mpsc, oneshot};
12
13#[derive(Debug, Clone)]
15pub enum CallbackResult {
16 Success(String),
18 Error(u32),
20}
21
22impl CallbackResult {
23 pub fn into_result(self) -> Result<String, u32> {
25 match self {
26 CallbackResult::Success(data) => Ok(data),
27 CallbackResult::Error(code) => Err(code),
28 }
29 }
30}
31
32enum CallbackEntry {
33 Oneshot(oneshot::Sender<CallbackResult>),
34 Stream(mpsc::Sender<CallbackResult>),
35 Handler(Arc<dyn Fn(CallbackResult) + Send + Sync>),
36}
37
38struct CallbackRegistry {
39 callbacks: Mutex<HashMap<u64, CallbackEntry>>,
40 next_id: AtomicU64,
41}
42
43impl CallbackRegistry {
44 fn new() -> Self {
45 Self {
46 callbacks: Mutex::new(HashMap::new()),
47 next_id: AtomicU64::new(1),
48 }
49 }
50
51 fn register_oneshot(&self) -> (u64, oneshot::Receiver<CallbackResult>) {
52 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
53 let (sender, receiver) = oneshot::channel();
54
55 {
56 let mut callbacks = self.callbacks.lock().unwrap();
57 callbacks.insert(id, CallbackEntry::Oneshot(sender));
58 }
59
60 (id, receiver)
61 }
62
63 fn register_stream(&self) -> (u64, mpsc::Receiver<CallbackResult>) {
64 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
65 let (sender, receiver) = mpsc::channel(16); {
68 let mut callbacks = self.callbacks.lock().unwrap();
69 callbacks.insert(id, CallbackEntry::Stream(sender));
70 }
71
72 (id, receiver)
73 }
74
75 fn register_handler<F>(&self, handler: F) -> u64
76 where
77 F: Fn(CallbackResult) + Send + Sync + 'static,
78 {
79 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
80
81 {
82 let mut callbacks = self.callbacks.lock().unwrap();
83 callbacks.insert(id, CallbackEntry::Handler(Arc::new(handler)));
84 }
85
86 id
87 }
88
89 fn unregister(&self, id: u64) -> bool {
90 let mut callbacks = self.callbacks.lock().unwrap();
91 callbacks.remove(&id).is_some()
92 }
93
94 fn invoke(&self, id: u64, result: CallbackResult) -> bool {
95 enum Action {
96 Oneshot(oneshot::Sender<CallbackResult>),
97 Stream(mpsc::Sender<CallbackResult>),
98 Handler(Arc<dyn Fn(CallbackResult) + Send + Sync>),
99 None,
100 }
101
102 let action = {
103 let mut callbacks = self.callbacks.lock().unwrap();
104 match callbacks.get(&id) {
105 Some(CallbackEntry::Oneshot(_)) => {
106 if let Some(CallbackEntry::Oneshot(sender)) = callbacks.remove(&id) {
107 Action::Oneshot(sender)
108 } else {
109 Action::None
110 }
111 }
112 Some(CallbackEntry::Stream(sender)) => Action::Stream(sender.clone()),
113 Some(CallbackEntry::Handler(handler)) => Action::Handler(handler.clone()),
114 None => Action::None,
115 }
116 };
117
118 match action {
119 Action::Oneshot(sender) => {
120 let _ = sender.send(result);
121 true
122 }
123 Action::Stream(sender) => match sender.try_send(result) {
124 Ok(_) => true,
125 Err(mpsc::error::TrySendError::Full(_payload)) => {
126 false
128 }
129 Err(mpsc::error::TrySendError::Closed(_payload)) => {
130 let mut callbacks = self.callbacks.lock().unwrap();
132 callbacks.remove(&id);
133 false
134 }
135 },
136 Action::Handler(handler) => {
137 let handled = panic::catch_unwind(panic::AssertUnwindSafe(|| (handler)(result)));
138 if handled.is_err() {
139 let mut callbacks = self.callbacks.lock().unwrap();
140 callbacks.remove(&id);
141 false
142 } else {
143 true
144 }
145 }
146 Action::None => false,
147 }
148 }
149}
150
151static CALLBACK_REGISTRY: OnceLock<CallbackRegistry> = OnceLock::new();
152
153fn get_callback_registry() -> &'static CallbackRegistry {
154 CALLBACK_REGISTRY.get_or_init(CallbackRegistry::new)
155}
156
157pub fn get_callback() -> (u64, oneshot::Receiver<CallbackResult>) {
159 get_callback_registry().register_oneshot()
160}
161
162pub fn get_stream_callback() -> (u64, mpsc::Receiver<CallbackResult>) {
164 get_callback_registry().register_stream()
165}
166
167pub fn register_handler<F>(handler: F) -> u64
171where
172 F: Fn(CallbackResult) + Send + Sync + 'static,
173{
174 get_callback_registry().register_handler(handler)
175}
176
177pub fn remove_callback(id: u64) -> bool {
179 get_callback_registry().unregister(id)
180}
181
182pub fn invoke_callback(id: u64, result: Result<String, u32>) -> bool {
187 let cb_result = match result {
188 Ok(data) => CallbackResult::Success(data),
189 Err(code) => CallbackResult::Error(code),
190 };
191 get_callback_registry().invoke(id, cb_result)
192}
193
194#[derive(Debug, Clone)]
196pub struct Event {
197 pub name: String,
198 pub data: String,
199}
200
201struct EventRegistry {
202 listeners: Mutex<HashMap<String, Vec<mpsc::Sender<Event>>>>,
203}
204
205impl EventRegistry {
206 fn new() -> Self {
207 Self {
208 listeners: Mutex::new(HashMap::new()),
209 }
210 }
211
212 fn subscribe(&self, event_name: String) -> mpsc::Receiver<Event> {
213 let (sender, receiver) = mpsc::channel(16); let mut listeners = self.listeners.lock().unwrap();
216 listeners.entry(event_name).or_default().push(sender);
217
218 receiver
219 }
220
221 fn publish(&self, name: &str, data: &str) {
222 let mut listeners = self.listeners.lock().unwrap();
223
224 if let Some(senders) = listeners.get_mut(name) {
225 let event = Event {
226 name: name.to_string(),
227 data: data.to_string(),
228 };
229 senders.retain(|sender| {
232 match sender.try_send(event.clone()) {
233 Ok(_) => true, Err(mpsc::error::TrySendError::Full(_)) => true, Err(mpsc::error::TrySendError::Closed(_)) => false, }
237 });
238 }
239 }
240}
241
242static EVENT_REGISTRY: OnceLock<EventRegistry> = OnceLock::new();
243
244fn get_event_registry() -> &'static EventRegistry {
245 EVENT_REGISTRY.get_or_init(EventRegistry::new)
246}
247
248pub fn subscribe(event_name: String) -> mpsc::Receiver<Event> {
252 get_event_registry().subscribe(event_name)
253}
254
255pub fn publish(name: String, data: String) {
261 get_event_registry().publish(&name, &data);
262}