Skip to main content

nautilus_common/msgbus/
typed_handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Compile-time type-safe message handler infrastructure.
17//!
18//! This module provides generic handler traits and types that enable type-safe
19//! message dispatch without runtime downcasting for built-in message types.
20
21use std::{any::Any, fmt::Debug, marker::PhantomData, rc::Rc};
22
23use nautilus_core::UUID4;
24use ustr::Ustr;
25
26/// Compile-time type-safe message handler trait.
27///
28/// Provides zero-cost dispatch for statically typed messages. Can also be used
29/// with `dyn Any` for dynamic dispatch when type flexibility is needed.
30pub trait Handler<T: ?Sized>: 'static {
31    /// Returns the unique identifier for this handler.
32    fn id(&self) -> Ustr;
33
34    /// Handles a message of type `T`.
35    fn handle(&self, message: &T);
36}
37
38impl<T: ?Sized, H: Handler<T>> Handler<T> for Rc<H> {
39    fn id(&self) -> Ustr {
40        (**self).id()
41    }
42
43    fn handle(&self, message: &T) {
44        (**self).handle(message);
45    }
46}
47
48/// A shareable wrapper for typed handlers.
49///
50/// Provides reference-counted access to handlers. Supports both concrete types
51/// for zero-cost dispatch and `dyn Any` for dynamic dispatch.
52///
53/// # Thread Safety
54///
55/// Uses `Rc` intentionally (not `Arc`) for single-threaded use within each
56/// async runtime. The MessageBus uses thread-local storage to ensure each
57/// thread gets its own handlers.
58pub struct TypedHandler<T: 'static + ?Sized>(pub Rc<dyn Handler<T>>);
59
60impl<T: 'static + ?Sized> Clone for TypedHandler<T> {
61    fn clone(&self) -> Self {
62        Self(Rc::clone(&self.0))
63    }
64}
65
66impl<T: 'static + ?Sized> TypedHandler<T> {
67    /// Returns the handler ID.
68    pub fn id(&self) -> Ustr {
69        self.0.id()
70    }
71
72    /// Handles a message by delegating to the inner handler.
73    pub fn handle(&self, message: &T) {
74        self.0.handle(message);
75    }
76}
77
78impl<T: 'static> TypedHandler<T> {
79    /// Creates a new typed handler from any type implementing `Handler<T>`.
80    pub fn new<H: Handler<T>>(handler: H) -> Self {
81        Self(Rc::new(handler))
82    }
83
84    /// Creates a new typed handler from a callback function.
85    pub fn from<F>(callback: F) -> Self
86    where
87        F: Fn(&T) + 'static,
88    {
89        Self::new(CallbackHandler::new(None::<&str>, callback))
90    }
91
92    /// Creates a new typed handler from a callback function with a custom ID.
93    pub fn from_with_id<S: AsRef<str>, F>(id: S, callback: F) -> Self
94    where
95        F: Fn(&T) + 'static,
96    {
97        Self::new(CallbackHandler::new(Some(id), callback))
98    }
99}
100
101impl<T: 'static + ?Sized> Debug for TypedHandler<T> {
102    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103        f.debug_struct(stringify!(TypedHandler))
104            .field("id", &self.0.id())
105            .field("type", &std::any::type_name::<T>())
106            .finish()
107    }
108}
109
110impl<T: 'static + ?Sized> PartialEq for TypedHandler<T> {
111    fn eq(&self, other: &Self) -> bool {
112        self.0.id() == other.0.id()
113    }
114}
115
116impl<T: 'static + ?Sized> Eq for TypedHandler<T> {}
117
118impl<T: 'static + ?Sized> std::hash::Hash for TypedHandler<T> {
119    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
120        self.0.id().hash(state);
121    }
122}
123
124impl From<Rc<dyn Handler<dyn Any>>> for TypedHandler<dyn Any> {
125    fn from(handler: Rc<dyn Handler<dyn Any>>) -> Self {
126        Self(handler)
127    }
128}
129
130/// Type alias for handlers that work with dynamically-typed messages.
131///
132/// This replaces the legacy `MessageHandler` trait with a unified handler
133/// type that supports both typed and dynamic dispatch through the same trait.
134pub type ShareableMessageHandler = TypedHandler<dyn Any>;
135
136/// Creates a new `ShareableMessageHandler` from an `Rc<dyn Handler<dyn Any>>`.
137#[must_use]
138pub fn shareable_handler(handler: Rc<dyn Handler<dyn Any>>) -> ShareableMessageHandler {
139    TypedHandler(handler)
140}
141
142impl ShareableMessageHandler {
143    /// Creates a handler from a typed closure that internally downcasts.
144    ///
145    /// Use this when you need Any-based routing but want type-safe handling.
146    /// The callback will only be invoked if the message downcasts successfully.
147    pub fn from_typed<T, F>(f: F) -> Self
148    where
149        T: 'static,
150        F: Fn(&T) + 'static,
151    {
152        TypedHandler(Rc::new(DowncastingHandler::new(None::<&str>, f)))
153    }
154
155    /// Creates a handler from an Any-typed closure.
156    pub fn from_any<F>(f: F) -> Self
157    where
158        F: Fn(&dyn Any) + 'static,
159    {
160        TypedHandler(Rc::new(AnyCallbackHandler::new(None::<&str>, f)))
161    }
162}
163
164/// Handler that downcasts `&dyn Any` to a concrete type before calling the callback.
165struct DowncastingHandler<T, F: Fn(&T)> {
166    id: Ustr,
167    callback: F,
168    _marker: PhantomData<T>,
169}
170
171impl<T: 'static, F: Fn(&T) + 'static> DowncastingHandler<T, F> {
172    fn new<S: AsRef<str>>(id: Option<S>, callback: F) -> Self {
173        let id_ustr = id.map_or_else(
174            || generate_handler_id::<T, F>(&callback),
175            |s| Ustr::from(s.as_ref()),
176        );
177        Self {
178            id: id_ustr,
179            callback,
180            _marker: PhantomData,
181        }
182    }
183}
184
185impl<T: 'static, F: Fn(&T) + 'static> Handler<dyn Any> for DowncastingHandler<T, F> {
186    fn id(&self) -> Ustr {
187        self.id
188    }
189
190    fn handle(&self, message: &dyn Any) {
191        if let Some(typed_msg) = message.downcast_ref::<T>() {
192            (self.callback)(typed_msg);
193        } else {
194            log::error!(
195                "DowncastingHandler downcast failed: expected {} got {:?}",
196                std::any::type_name::<T>(),
197                message.type_id()
198            );
199        }
200    }
201}
202
203/// Handler that directly receives `&dyn Any` without downcasting.
204struct AnyCallbackHandler<F: Fn(&dyn Any)> {
205    id: Ustr,
206    callback: F,
207}
208
209impl<F: Fn(&dyn Any) + 'static> AnyCallbackHandler<F> {
210    fn new<S: AsRef<str>>(id: Option<S>, callback: F) -> Self {
211        let id_ustr = id.map_or_else(
212            || {
213                let callback_ptr = std::ptr::from_ref(&callback);
214                let uuid = UUID4::new();
215                Ustr::from(&format!("<{callback_ptr:?}>-{uuid}"))
216            },
217            |s| Ustr::from(s.as_ref()),
218        );
219        Self {
220            id: id_ustr,
221            callback,
222        }
223    }
224}
225
226impl<F: Fn(&dyn Any) + 'static> Handler<dyn Any> for AnyCallbackHandler<F> {
227    fn id(&self) -> Ustr {
228        self.id
229    }
230
231    fn handle(&self, message: &dyn Any) {
232        (self.callback)(message);
233    }
234}
235
236/// A callback-based handler implementation.
237///
238/// This is the typed equivalent of `TypedMessageHandler`,
239/// but without runtime downcasting overhead.
240pub struct CallbackHandler<T, F: Fn(&T)> {
241    id: Ustr,
242    callback: F,
243    _marker: PhantomData<T>,
244}
245
246impl<T: 'static, F: Fn(&T) + 'static> CallbackHandler<T, F> {
247    /// Creates a new callback handler with an optional custom ID.
248    pub fn new<S: AsRef<str>>(id: Option<S>, callback: F) -> Self {
249        let id_ustr = id.map_or_else(
250            || generate_handler_id(&callback),
251            |s| Ustr::from(s.as_ref()),
252        );
253
254        Self {
255            id: id_ustr,
256            callback,
257            _marker: PhantomData,
258        }
259    }
260}
261
262impl<T: 'static, F: Fn(&T) + 'static> Handler<T> for CallbackHandler<T, F> {
263    fn id(&self) -> Ustr {
264        self.id
265    }
266
267    fn handle(&self, message: &T) {
268        (self.callback)(message);
269    }
270}
271
272impl<T, F: Fn(&T)> Debug for CallbackHandler<T, F> {
273    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
274        f.debug_struct(stringify!(CallbackHandler))
275            .field("id", &self.id)
276            .field("type", &std::any::type_name::<T>())
277            .finish()
278    }
279}
280
281fn generate_handler_id<T: 'static + ?Sized, F: 'static + Fn(&T)>(callback: &F) -> Ustr {
282    let callback_ptr = std::ptr::from_ref(callback);
283    let uuid = UUID4::new();
284    Ustr::from(&format!("<{callback_ptr:?}>-{uuid}"))
285}
286
287fn generate_into_handler_id<T: 'static, F: 'static + Fn(T)>(callback: &F) -> Ustr {
288    let callback_ptr = std::ptr::from_ref(callback);
289    let uuid = UUID4::new();
290    Ustr::from(&format!("<{callback_ptr:?}>-{uuid}"))
291}
292
293/// Compile-time type-safe message handler trait that takes ownership.
294///
295/// Unlike [`Handler<T>`] which borrows messages, this trait takes ownership
296/// of messages, enabling zero-copy processing when the handler needs to store
297/// or forward the message.
298pub trait IntoHandler<T>: 'static {
299    /// Returns the unique identifier for this handler.
300    fn id(&self) -> Ustr;
301
302    /// Handles a message of type `T`, taking ownership.
303    fn handle(&self, message: T);
304}
305
306impl<T, H: IntoHandler<T>> IntoHandler<T> for Rc<H> {
307    fn id(&self) -> Ustr {
308        (**self).id()
309    }
310
311    fn handle(&self, message: T) {
312        (**self).handle(message);
313    }
314}
315
316/// A shareable wrapper for ownership-based typed handlers.
317///
318/// This is the ownership-based equivalent of [`TypedHandler`], used for
319/// point-to-point messaging where the sender transfers ownership of the message.
320///
321/// # Thread Safety
322///
323/// Uses `Rc` intentionally (not `Arc`) for single-threaded use within each
324/// async runtime. The MessageBus uses thread-local storage to ensure each
325/// thread gets its own handlers.
326pub struct TypedIntoHandler<T: 'static>(pub Rc<dyn IntoHandler<T>>);
327
328impl<T: 'static> Clone for TypedIntoHandler<T> {
329    fn clone(&self) -> Self {
330        Self(Rc::clone(&self.0))
331    }
332}
333
334impl<T: 'static> TypedIntoHandler<T> {
335    /// Creates a new typed into handler from any type implementing `IntoHandler<T>`.
336    pub fn new<H: IntoHandler<T>>(handler: H) -> Self {
337        Self(Rc::new(handler))
338    }
339
340    /// Creates a new typed into handler from a callback function.
341    pub fn from<F>(callback: F) -> Self
342    where
343        F: Fn(T) + 'static,
344    {
345        Self::new(IntoCallbackHandler::new(None::<&str>, callback))
346    }
347
348    /// Creates a new typed into handler from a callback function with a custom ID.
349    pub fn from_with_id<S: AsRef<str>, F>(id: S, callback: F) -> Self
350    where
351        F: Fn(T) + 'static,
352    {
353        Self::new(IntoCallbackHandler::new(Some(id), callback))
354    }
355
356    /// Returns the handler ID.
357    pub fn id(&self) -> Ustr {
358        self.0.id()
359    }
360
361    /// Handles a message by delegating to the inner handler, taking ownership.
362    pub fn handle(&self, message: T) {
363        self.0.handle(message);
364    }
365}
366
367impl<T: 'static> Debug for TypedIntoHandler<T> {
368    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
369        f.debug_struct(stringify!(TypedIntoHandler))
370            .field("id", &self.0.id())
371            .field("type", &std::any::type_name::<T>())
372            .finish()
373    }
374}
375
376impl<T: 'static> PartialEq for TypedIntoHandler<T> {
377    fn eq(&self, other: &Self) -> bool {
378        self.0.id() == other.0.id()
379    }
380}
381
382impl<T: 'static> Eq for TypedIntoHandler<T> {}
383
384impl<T: 'static> std::hash::Hash for TypedIntoHandler<T> {
385    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
386        self.0.id().hash(state);
387    }
388}
389
390/// A callback-based handler implementation that takes ownership.
391///
392/// This is the ownership-based equivalent of `CallbackHandler`.
393pub struct IntoCallbackHandler<T, F: Fn(T)> {
394    id: Ustr,
395    callback: F,
396    _marker: PhantomData<T>,
397}
398
399impl<T: 'static, F: Fn(T) + 'static> IntoCallbackHandler<T, F> {
400    /// Creates a new into callback handler with an optional custom ID.
401    pub fn new<S: AsRef<str>>(id: Option<S>, callback: F) -> Self {
402        let id_ustr = id.map_or_else(
403            || generate_into_handler_id(&callback),
404            |s| Ustr::from(s.as_ref()),
405        );
406
407        Self {
408            id: id_ustr,
409            callback,
410            _marker: PhantomData,
411        }
412    }
413}
414
415impl<T: 'static, F: Fn(T) + 'static> IntoHandler<T> for IntoCallbackHandler<T, F> {
416    fn id(&self) -> Ustr {
417        self.id
418    }
419
420    fn handle(&self, message: T) {
421        (self.callback)(message);
422    }
423}
424
425impl<T, F: Fn(T)> Debug for IntoCallbackHandler<T, F> {
426    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
427        f.debug_struct(stringify!(IntoCallbackHandler))
428            .field("id", &self.id)
429            .field("type", &std::any::type_name::<T>())
430            .finish()
431    }
432}
433
434#[cfg(test)]
435mod tests {
436    use std::cell::RefCell;
437
438    use rstest::rstest;
439
440    use super::*;
441
442    #[rstest]
443    fn test_typed_handler_from_fn() {
444        let received = Rc::new(RefCell::new(Vec::new()));
445        let received_clone = received.clone();
446
447        let handler = TypedHandler::from(move |msg: &String| {
448            received_clone.borrow_mut().push(msg.clone());
449        });
450
451        handler.handle(&"test1".to_string());
452        handler.handle(&"test2".to_string());
453
454        assert_eq!(*received.borrow(), vec!["test1", "test2"]);
455    }
456
457    #[rstest]
458    fn test_typed_handler_with_custom_id() {
459        let handler = TypedHandler::from_with_id("custom-id", |_msg: &i32| {});
460
461        assert_eq!(handler.id().as_str(), "custom-id");
462    }
463
464    #[rstest]
465    fn test_typed_handler_equality() {
466        let handler1 = TypedHandler::from_with_id("same-id", |_: &u32| {});
467        let handler2 = TypedHandler::from_with_id("same-id", |_: &u32| {});
468        let handler3 = TypedHandler::from_with_id("different-id", |_: &u32| {});
469
470        assert_eq!(handler1, handler2);
471        assert_ne!(handler1, handler3);
472    }
473
474    #[rstest]
475    fn test_typed_handler_hash() {
476        use std::collections::HashSet;
477
478        let handler1 = TypedHandler::from_with_id("id-a", |_: &u32| {});
479        let handler2 = TypedHandler::from_with_id("id-a", |_: &u32| {});
480        let handler3 = TypedHandler::from_with_id("id-b", |_: &u32| {});
481
482        let mut set = HashSet::new();
483        set.insert(handler1);
484
485        // Same ID should be considered duplicate
486        assert!(!set.insert(handler2));
487        // Different ID should be added
488        assert!(set.insert(handler3));
489        assert_eq!(set.len(), 2);
490    }
491
492    #[rstest]
493    fn test_typed_handler_debug() {
494        let handler = TypedHandler::from_with_id("debug-test", |_: &String| {});
495        let debug_str = format!("{handler:?}");
496
497        assert!(debug_str.contains("TypedHandler"));
498        assert!(debug_str.contains("debug-test"));
499    }
500
501    // Tests for Handler<T> impl for Rc<H>
502    struct TestHandler {
503        id: Ustr,
504        call_count: RefCell<usize>,
505    }
506
507    impl TestHandler {
508        fn new(id: &str) -> Self {
509            Self {
510                id: Ustr::from(id),
511                call_count: RefCell::new(0),
512            }
513        }
514    }
515
516    impl Handler<i32> for TestHandler {
517        fn id(&self) -> Ustr {
518            self.id
519        }
520
521        fn handle(&self, _message: &i32) {
522            *self.call_count.borrow_mut() += 1;
523        }
524    }
525
526    #[rstest]
527    fn test_rc_handler_delegates_id() {
528        let handler = TestHandler::new("rc-test-id");
529        let rc_handler: Rc<TestHandler> = Rc::new(handler);
530
531        assert_eq!(rc_handler.id(), Ustr::from("rc-test-id"));
532    }
533
534    #[rstest]
535    fn test_rc_handler_delegates_handle() {
536        let handler = TestHandler::new("rc-handle-test");
537        let rc_handler: Rc<TestHandler> = Rc::new(handler);
538
539        rc_handler.handle(&42);
540        rc_handler.handle(&100);
541
542        assert_eq!(*rc_handler.call_count.borrow(), 2);
543    }
544
545    #[rstest]
546    fn test_rc_handler_shared_state() {
547        let handler = TestHandler::new("shared-state");
548        let rc1: Rc<TestHandler> = Rc::new(handler);
549        let rc2 = rc1.clone();
550
551        // Both Rc's point to same handler
552        rc1.handle(&1);
553        rc2.handle(&2);
554        rc1.handle(&3);
555
556        // All calls should be counted on the same handler
557        assert_eq!(*rc1.call_count.borrow(), 3);
558        assert_eq!(*rc2.call_count.borrow(), 3);
559    }
560
561    #[rstest]
562    fn test_typed_handler_from_rc() {
563        let handler = Rc::new(TestHandler::new("from-rc-test"));
564        let typed: TypedHandler<i32> = TypedHandler::new(handler.clone());
565
566        typed.handle(&42);
567
568        assert_eq!(*handler.call_count.borrow(), 1);
569        assert_eq!(typed.id(), Ustr::from("from-rc-test"));
570    }
571}