Skip to main content

lingxia_messaging/
lib.rs

1//! LingXia Messaging System
2//!
3//! Provides two core functionalities for cross-platform communication:
4//! 1. A flexible callback registry that supports oneshot, stream, and handler callbacks.
5//! 2. A publish-subscribe system for system-wide events.
6
7use std::collections::HashMap;
8use std::panic;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::{Arc, Mutex, OnceLock};
11use tokio::sync::{mpsc, oneshot};
12
13/// Callback result from platform
14#[derive(Debug, Clone)]
15pub enum CallbackResult {
16    /// Success with JSON payload
17    Success(String),
18    /// Error with a specific error code (defined in i18n)
19    Error(u32),
20}
21
22impl CallbackResult {
23    /// Convert to a Result for easier handling in logic crate
24    pub fn into_result(self) -> Result<String, u32> {
25        match self {
26            CallbackResult::Success(data) => Ok(data),
27            CallbackResult::Error(code) => Err(code),
28        }
29    }
30}
31
32enum CallbackEntry {
33    Oneshot(oneshot::Sender<CallbackResult>),
34    Stream(mpsc::Sender<CallbackResult>),
35    Handler(Arc<dyn Fn(CallbackResult) + Send + Sync>),
36}
37
38struct CallbackRegistry {
39    callbacks: Mutex<HashMap<u64, CallbackEntry>>,
40    next_id: AtomicU64,
41}
42
43impl CallbackRegistry {
44    fn new() -> Self {
45        Self {
46            callbacks: Mutex::new(HashMap::new()),
47            next_id: AtomicU64::new(1),
48        }
49    }
50
51    fn register_oneshot(&self) -> (u64, oneshot::Receiver<CallbackResult>) {
52        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
53        let (sender, receiver) = oneshot::channel();
54
55        {
56            let mut callbacks = self.callbacks.lock().unwrap();
57            callbacks.insert(id, CallbackEntry::Oneshot(sender));
58        }
59
60        (id, receiver)
61    }
62
63    fn register_stream(&self) -> (u64, mpsc::Receiver<CallbackResult>) {
64        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
65        let (sender, receiver) = mpsc::channel(16); // Buffer size of 16
66
67        {
68            let mut callbacks = self.callbacks.lock().unwrap();
69            callbacks.insert(id, CallbackEntry::Stream(sender));
70        }
71
72        (id, receiver)
73    }
74
75    fn register_handler<F>(&self, handler: F) -> u64
76    where
77        F: Fn(CallbackResult) + Send + Sync + 'static,
78    {
79        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
80
81        {
82            let mut callbacks = self.callbacks.lock().unwrap();
83            callbacks.insert(id, CallbackEntry::Handler(Arc::new(handler)));
84        }
85
86        id
87    }
88
89    fn unregister(&self, id: u64) -> bool {
90        let mut callbacks = self.callbacks.lock().unwrap();
91        callbacks.remove(&id).is_some()
92    }
93
94    fn invoke(&self, id: u64, result: CallbackResult) -> bool {
95        enum Action {
96            Oneshot(oneshot::Sender<CallbackResult>),
97            Stream(mpsc::Sender<CallbackResult>),
98            Handler(Arc<dyn Fn(CallbackResult) + Send + Sync>),
99            None,
100        }
101
102        let action = {
103            let mut callbacks = self.callbacks.lock().unwrap();
104            match callbacks.get(&id) {
105                Some(CallbackEntry::Oneshot(_)) => {
106                    if let Some(CallbackEntry::Oneshot(sender)) = callbacks.remove(&id) {
107                        Action::Oneshot(sender)
108                    } else {
109                        Action::None
110                    }
111                }
112                Some(CallbackEntry::Stream(sender)) => Action::Stream(sender.clone()),
113                Some(CallbackEntry::Handler(handler)) => Action::Handler(handler.clone()),
114                None => Action::None,
115            }
116        };
117
118        match action {
119            Action::Oneshot(sender) => {
120                let _ = sender.send(result);
121                true
122            }
123            Action::Stream(sender) => match sender.try_send(result) {
124                Ok(_) => true,
125                Err(mpsc::error::TrySendError::Full(_payload)) => {
126                    // Channel is full; report failure so caller can retry
127                    false
128                }
129                Err(mpsc::error::TrySendError::Closed(_payload)) => {
130                    // Channel is closed, remove the callback
131                    let mut callbacks = self.callbacks.lock().unwrap();
132                    callbacks.remove(&id);
133                    false
134                }
135            },
136            Action::Handler(handler) => {
137                let handled = panic::catch_unwind(panic::AssertUnwindSafe(|| (handler)(result)));
138                if handled.is_err() {
139                    let mut callbacks = self.callbacks.lock().unwrap();
140                    callbacks.remove(&id);
141                    false
142                } else {
143                    true
144                }
145            }
146            Action::None => false,
147        }
148    }
149}
150
151static CALLBACK_REGISTRY: OnceLock<CallbackRegistry> = OnceLock::new();
152
153fn get_callback_registry() -> &'static CallbackRegistry {
154    CALLBACK_REGISTRY.get_or_init(CallbackRegistry::new)
155}
156
157/// Register a oneshot callback and get its receiver.
158pub fn get_callback() -> (u64, oneshot::Receiver<CallbackResult>) {
159    get_callback_registry().register_oneshot()
160}
161
162/// Register a stream callback and get its receiver.
163pub fn get_stream_callback() -> (u64, mpsc::Receiver<CallbackResult>) {
164    get_callback_registry().register_stream()
165}
166
167/// Register a handler callback. The handler is executed immediately on the thread
168/// that calls `invoke_callback` with the returned ID. Use `remove_callback(id)`
169/// to unregister when no longer needed.
170pub fn register_handler<F>(handler: F) -> u64
171where
172    F: Fn(CallbackResult) + Send + Sync + 'static,
173{
174    get_callback_registry().register_handler(handler)
175}
176
177/// Remove callback by ID. This is useful for cancellation or timeout scenarios.
178pub fn remove_callback(id: u64) -> bool {
179    get_callback_registry().unregister(id)
180}
181
182/// Invoke callback (called from platform code) to send result back.
183/// - Oneshot: removes the callback after sending.
184/// - Stream: keeps the callback active; returns false if the channel is full or closed.
185/// - Handler: executes immediately on the caller's thread; drops the handler on panic.
186pub fn invoke_callback(id: u64, result: Result<String, u32>) -> bool {
187    let cb_result = match result {
188        Ok(data) => CallbackResult::Success(data),
189        Err(code) => CallbackResult::Error(code),
190    };
191    get_callback_registry().invoke(id, cb_result)
192}
193
194/// Represents a system-wide event.
195#[derive(Debug, Clone)]
196pub struct Event {
197    pub name: String,
198    pub data: String,
199}
200
201struct EventRegistry {
202    listeners: Mutex<HashMap<String, Vec<mpsc::Sender<Event>>>>,
203}
204
205impl EventRegistry {
206    fn new() -> Self {
207        Self {
208            listeners: Mutex::new(HashMap::new()),
209        }
210    }
211
212    fn subscribe(&self, event_name: String) -> mpsc::Receiver<Event> {
213        let (sender, receiver) = mpsc::channel(16); // Channel with a buffer of 16
214
215        let mut listeners = self.listeners.lock().unwrap();
216        listeners.entry(event_name).or_default().push(sender);
217
218        receiver
219    }
220
221    fn publish(&self, name: &str, data: &str) {
222        let mut listeners = self.listeners.lock().unwrap();
223
224        if let Some(senders) = listeners.get_mut(name) {
225            let event = Event {
226                name: name.to_string(),
227                data: data.to_string(),
228            };
229            // Use retain to keep only the active senders.
230            // A sender is considered inactive if its channel is closed.
231            senders.retain(|sender| {
232                match sender.try_send(event.clone()) {
233                    Ok(_) => true,                                      // Sent successfully, keep sender.
234                    Err(mpsc::error::TrySendError::Full(_)) => true, // Channel is full, listener is slow. Keep sender.
235                    Err(mpsc::error::TrySendError::Closed(_)) => false, // Channel is closed, listener is gone. Remove sender.
236                }
237            });
238        }
239    }
240}
241
242static EVENT_REGISTRY: OnceLock<EventRegistry> = OnceLock::new();
243
244fn get_event_registry() -> &'static EventRegistry {
245    EVENT_REGISTRY.get_or_init(EventRegistry::new)
246}
247
248/// Subscribe to a named event.
249///
250/// Returns a receiver that will get a copy of every event published with that name.
251pub fn subscribe(event_name: String) -> mpsc::Receiver<Event> {
252    get_event_registry().subscribe(event_name)
253}
254
255/// Publish an event to all subscribers.
256///
257/// This is a synchronous, non-blocking function that is safe to call from any thread,
258/// including the main UI thread. It will try to send to all listeners and will
259/// automatically clean up any listeners whose channels have been closed.
260pub fn publish(name: String, data: String) {
261    get_event_registry().publish(&name, &data);
262}