nautilus_common/msgbus/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Message handler functionality for the message bus system.
17//!
18//! This module provides a trait and implementations for handling messages
19//! in a type-safe manner, enabling both typed and untyped message processing.
20
21use std::{
22    any::{Any, type_name},
23    fmt::Debug,
24    marker::PhantomData,
25    rc::Rc,
26};
27
28use nautilus_core::UUID4;
29use ustr::Ustr;
30
31pub trait MessageHandler: Any {
32    /// Returns the unique identifier for this handler.
33    fn id(&self) -> Ustr;
34    /// Handles a message of any type.
35    fn handle(&self, message: &dyn Any);
36    /// Returns this handler as a trait object.
37    fn as_any(&self) -> &dyn Any;
38}
39
40impl PartialEq for dyn MessageHandler {
41    fn eq(&self, other: &Self) -> bool {
42        self.id() == other.id()
43    }
44}
45
46impl Eq for dyn MessageHandler {}
47
48#[derive(Debug)]
49pub struct TypedMessageHandler<T: 'static + ?Sized, F: Fn(&T) + 'static> {
50    id: Ustr,
51    callback: F,
52    _phantom: PhantomData<T>,
53}
54
55impl<T: 'static, F: Fn(&T) + 'static> TypedMessageHandler<T, F> {
56    /// Creates a new handler with an optional custom ID.
57    pub fn new<S: AsRef<str>>(id: Option<S>, callback: F) -> Self {
58        let id_ustr = id.map_or_else(
59            || generate_handler_id(&callback),
60            |s| Ustr::from(s.as_ref()),
61        );
62
63        Self {
64            id: id_ustr,
65            callback,
66            _phantom: PhantomData,
67        }
68    }
69
70    /// Creates a new handler with an auto-generated ID.
71    pub fn from(callback: F) -> Self {
72        Self::new::<Ustr>(None, callback)
73    }
74}
75
76impl<T: 'static, F: Fn(&T) + 'static> MessageHandler for TypedMessageHandler<T, F> {
77    fn id(&self) -> Ustr {
78        self.id
79    }
80
81    fn handle(&self, message: &dyn Any) {
82        if let Some(typed_msg) = message.downcast_ref::<T>() {
83            (self.callback)(typed_msg);
84        } else {
85            log::error!(
86                "TypedMessageHandler downcast failed: expected {} got {:?}",
87                type_name::<T>(),
88                message.type_id()
89            );
90        }
91    }
92
93    fn as_any(&self) -> &dyn Any {
94        self
95    }
96}
97
98impl<F: Fn(&dyn Any) + 'static> TypedMessageHandler<dyn Any, F> {
99    /// Creates a new handler for dynamic Any messages with an optional custom ID.
100    pub fn new_any<S: AsRef<str>>(id: Option<S>, callback: F) -> Self {
101        let id_ustr = id.map_or_else(
102            || generate_handler_id(&callback),
103            |s| Ustr::from(s.as_ref()),
104        );
105
106        Self {
107            id: id_ustr,
108            callback,
109            _phantom: PhantomData,
110        }
111    }
112
113    /// Creates a handler for Any messages with an optional ID.
114    pub fn from_any<S: AsRef<str>>(id_opt: Option<S>, callback: F) -> Self {
115        Self::new_any(id_opt, callback)
116    }
117
118    /// Creates a handler for Any messages with an auto-generated ID.
119    pub fn with_any(callback: F) -> Self {
120        Self::new_any::<&str>(None, callback)
121    }
122}
123
124impl<F: Fn(&dyn Any) + 'static> MessageHandler for TypedMessageHandler<dyn Any, F> {
125    fn id(&self) -> Ustr {
126        self.id
127    }
128
129    fn handle(&self, message: &dyn Any) {
130        (self.callback)(message);
131    }
132
133    fn as_any(&self) -> &dyn Any {
134        self
135    }
136}
137
138fn generate_handler_id<T: 'static + ?Sized, F: 'static + Fn(&T)>(callback: &F) -> Ustr {
139    let callback_ptr = std::ptr::from_ref(callback);
140    let uuid = UUID4::new();
141    Ustr::from(&format!("<{callback_ptr:?}>-{uuid}"))
142}
143
144// ShareableMessageHandler contains Rc<dyn MessageHandler> which is not Send/Sync.
145// This is intentional - message handlers are designed for single-threaded use within
146// each async runtime. The MessageBus uses thread-local storage to ensure each thread
147// gets its own handlers, eliminating the need for unsafe Send/Sync implementations.
148#[repr(transparent)]
149#[derive(Clone)]
150pub struct ShareableMessageHandler(pub Rc<dyn MessageHandler>);
151
152impl ShareableMessageHandler {
153    pub fn id(&self) -> Ustr {
154        self.0.id()
155    }
156}
157
158impl Debug for ShareableMessageHandler {
159    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160        f.debug_struct(stringify!(ShareableMessageHandler))
161            .field("id", &self.0.id())
162            .field("type", &std::any::type_name::<Self>().to_string())
163            .finish()
164    }
165}
166
167impl From<Rc<dyn MessageHandler>> for ShareableMessageHandler {
168    fn from(value: Rc<dyn MessageHandler>) -> Self {
169        Self(value)
170    }
171}