nautilus_common/msgbus/
handler.rs1use std::{
22 any::{Any, type_name},
23 fmt::Debug,
24 marker::PhantomData,
25 rc::Rc,
26};
27
28use nautilus_core::UUID4;
29use ustr::Ustr;
30
31pub trait MessageHandler: Any {
32 fn id(&self) -> Ustr;
34 fn handle(&self, message: &dyn Any);
36 fn as_any(&self) -> &dyn Any;
38}
39
40impl PartialEq for dyn MessageHandler {
41 fn eq(&self, other: &Self) -> bool {
42 self.id() == other.id()
43 }
44}
45
46impl Eq for dyn MessageHandler {}
47
48#[derive(Debug)]
49pub struct TypedMessageHandler<T: 'static + ?Sized, F: Fn(&T) + 'static> {
50 id: Ustr,
51 callback: F,
52 _phantom: PhantomData<T>,
53}
54
55impl<T: 'static, F: Fn(&T) + 'static> TypedMessageHandler<T, F> {
56 pub fn new<S: AsRef<str>>(id: Option<S>, callback: F) -> Self {
58 let id_ustr = id.map_or_else(
59 || generate_handler_id(&callback),
60 |s| Ustr::from(s.as_ref()),
61 );
62
63 Self {
64 id: id_ustr,
65 callback,
66 _phantom: PhantomData,
67 }
68 }
69
70 pub fn from(callback: F) -> Self {
72 Self::new::<Ustr>(None, callback)
73 }
74}
75
76impl<T: 'static, F: Fn(&T) + 'static> MessageHandler for TypedMessageHandler<T, F> {
77 fn id(&self) -> Ustr {
78 self.id
79 }
80
81 fn handle(&self, message: &dyn Any) {
82 if let Some(typed_msg) = message.downcast_ref::<T>() {
83 (self.callback)(typed_msg);
84 } else {
85 log::error!(
86 "TypedMessageHandler downcast failed: expected {} got {:?}",
87 type_name::<T>(),
88 message.type_id()
89 );
90 }
91 }
92
93 fn as_any(&self) -> &dyn Any {
94 self
95 }
96}
97
98impl<F: Fn(&dyn Any) + 'static> TypedMessageHandler<dyn Any, F> {
99 pub fn new_any<S: AsRef<str>>(id: Option<S>, callback: F) -> Self {
101 let id_ustr = id.map_or_else(
102 || generate_handler_id(&callback),
103 |s| Ustr::from(s.as_ref()),
104 );
105
106 Self {
107 id: id_ustr,
108 callback,
109 _phantom: PhantomData,
110 }
111 }
112
113 pub fn from_any<S: AsRef<str>>(id_opt: Option<S>, callback: F) -> Self {
115 Self::new_any(id_opt, callback)
116 }
117
118 pub fn with_any(callback: F) -> Self {
120 Self::new_any::<&str>(None, callback)
121 }
122}
123
124impl<F: Fn(&dyn Any) + 'static> MessageHandler for TypedMessageHandler<dyn Any, F> {
125 fn id(&self) -> Ustr {
126 self.id
127 }
128
129 fn handle(&self, message: &dyn Any) {
130 (self.callback)(message);
131 }
132
133 fn as_any(&self) -> &dyn Any {
134 self
135 }
136}
137
138fn generate_handler_id<T: 'static + ?Sized, F: 'static + Fn(&T)>(callback: &F) -> Ustr {
139 let callback_ptr = std::ptr::from_ref(callback);
140 let uuid = UUID4::new();
141 Ustr::from(&format!("<{callback_ptr:?}>-{uuid}"))
142}
143
144#[repr(transparent)]
149#[derive(Clone)]
150pub struct ShareableMessageHandler(pub Rc<dyn MessageHandler>);
151
152impl ShareableMessageHandler {
153 pub fn id(&self) -> Ustr {
154 self.0.id()
155 }
156}
157
158impl Debug for ShareableMessageHandler {
159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160 f.debug_struct(stringify!(ShareableMessageHandler))
161 .field("id", &self.0.id())
162 .field("type", &std::any::type_name::<Self>().to_string())
163 .finish()
164 }
165}
166
167impl From<Rc<dyn MessageHandler>> for ShareableMessageHandler {
168 fn from(value: Rc<dyn MessageHandler>) -> Self {
169 Self(value)
170 }
171}