use std::{
any::Any,
cell::RefCell,
fmt::Debug,
rc::Rc,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
};
use ahash::AHashMap;
use nautilus_core::{UUID4, message::Message};
use ustr::Ustr;
use crate::msgbus::{
Handler, IntoHandler, ShareableMessageHandler, TypedHandler, TypedIntoHandler,
typed_handler::shareable_handler,
};
#[derive(Clone)]
pub struct StubMessageHandler {
id: Ustr,
callback: Arc<dyn Fn(Message) + Send>,
}
impl Debug for StubMessageHandler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct(stringify!(StubMessageHandler))
.field("id", &self.id)
.finish()
}
}
impl Handler<dyn Any> for StubMessageHandler {
fn id(&self) -> Ustr {
self.id
}
fn handle(&self, message: &dyn Any) {
if let Some(msg) = message.downcast_ref::<Message>() {
(self.callback)(msg.clone());
}
}
}
#[must_use]
#[allow(unused_must_use)]
pub fn get_stub_shareable_handler(id: Option<Ustr>) -> ShareableMessageHandler {
let unique_id = id.unwrap_or_else(|| Ustr::from(UUID4::new().as_str()));
shareable_handler(Rc::new(StubMessageHandler {
id: unique_id,
callback: Arc::new(|m: Message| {
format!("{m:?}");
}),
}))
}
#[derive(Debug, Clone)]
pub struct CallCheckHandler {
id: Ustr,
called: Arc<AtomicBool>,
}
impl CallCheckHandler {
#[must_use]
pub fn new(id: Option<Ustr>) -> Self {
let unique_id = id.unwrap_or_else(|| Ustr::from(UUID4::new().as_str()));
Self {
id: unique_id,
called: Arc::new(AtomicBool::new(false)),
}
}
#[must_use]
pub fn was_called(&self) -> bool {
self.called.load(Ordering::SeqCst)
}
#[must_use]
pub fn handler(&self) -> ShareableMessageHandler {
shareable_handler(Rc::new(self.clone()))
}
}
impl Handler<dyn Any> for CallCheckHandler {
fn id(&self) -> Ustr {
self.id
}
fn handle(&self, _message: &dyn Any) {
self.called.store(true, Ordering::SeqCst);
}
}
#[must_use]
pub fn get_call_check_handler(id: Option<Ustr>) -> (ShareableMessageHandler, CallCheckHandler) {
let checker = CallCheckHandler::new(id);
let handler = checker.handler();
(handler, checker)
}
#[derive(Debug, Clone)]
pub struct AnySavingHandler<T> {
id: Ustr,
messages: Rc<RefCell<Vec<T>>>,
}
impl<T: Clone + 'static> AnySavingHandler<T> {
#[must_use]
pub fn new(id: Option<Ustr>) -> Self {
let unique_id = id.unwrap_or_else(|| Ustr::from(UUID4::new().as_str()));
Self {
id: unique_id,
messages: Rc::new(RefCell::new(Vec::new())),
}
}
#[must_use]
pub fn get_messages(&self) -> Vec<T> {
self.messages.borrow().clone()
}
pub fn clear(&self) {
self.messages.borrow_mut().clear();
}
#[must_use]
pub fn handler(&self) -> ShareableMessageHandler {
shareable_handler(Rc::new(self.clone()))
}
}
impl<T: Clone + 'static> Handler<dyn Any> for AnySavingHandler<T> {
fn id(&self) -> Ustr {
self.id
}
fn handle(&self, message: &dyn Any) {
if let Some(m) = message.downcast_ref::<T>() {
self.messages.borrow_mut().push(m.clone());
} else {
log::error!(
"AnySavingHandler: expected {} got {:?}",
std::any::type_name::<T>(),
message.type_id()
);
}
}
}
#[must_use]
pub fn get_any_saving_handler<T: Clone + 'static>(
id: Option<Ustr>,
) -> (ShareableMessageHandler, AnySavingHandler<T>) {
let saver = AnySavingHandler::new(id);
let handler = saver.handler();
(handler, saver)
}
pub type MessageSavingHandler<T> = AnySavingHandler<T>;
#[derive(Debug, Clone)]
pub struct TypedMessageSavingHandler<T> {
id: Ustr,
messages: Rc<RefCell<Vec<T>>>,
}
impl<T: Clone + 'static> TypedMessageSavingHandler<T> {
#[must_use]
pub fn new(id: Option<Ustr>) -> Self {
let unique_id = id.unwrap_or_else(|| Ustr::from(UUID4::new().as_str()));
Self {
id: unique_id,
messages: Rc::new(RefCell::new(Vec::new())),
}
}
#[must_use]
pub fn get_messages(&self) -> Vec<T> {
self.messages.borrow().clone()
}
#[must_use]
pub fn handler(&self) -> TypedHandler<T> {
TypedHandler::new(self.clone())
}
}
impl<T: Clone + 'static> Handler<T> for TypedMessageSavingHandler<T> {
fn id(&self) -> Ustr {
self.id
}
fn handle(&self, message: &T) {
self.messages.borrow_mut().push(message.clone());
}
}
#[must_use]
pub fn get_typed_message_saving_handler<T: Clone + 'static>(
id: Option<Ustr>,
) -> (TypedHandler<T>, TypedMessageSavingHandler<T>) {
let saving_handler = TypedMessageSavingHandler::new(id);
let typed_handler = saving_handler.handler();
(typed_handler, saving_handler)
}
#[derive(Debug, Clone)]
pub struct TypedIntoMessageSavingHandler<T> {
id: Ustr,
messages: Rc<RefCell<Vec<T>>>,
}
impl<T: 'static> TypedIntoMessageSavingHandler<T> {
#[must_use]
pub fn new(id: Option<Ustr>) -> Self {
let unique_id = id.unwrap_or_else(|| Ustr::from(UUID4::new().as_str()));
Self {
id: unique_id,
messages: Rc::new(RefCell::new(Vec::new())),
}
}
#[must_use]
pub fn new_with_messages(id: Option<Ustr>, messages: Rc<RefCell<Vec<T>>>) -> Self {
let unique_id = id.unwrap_or_else(|| Ustr::from(UUID4::new().as_str()));
Self {
id: unique_id,
messages,
}
}
#[must_use]
pub fn get_messages(&self) -> Vec<T>
where
T: Clone,
{
self.messages.borrow().clone()
}
#[must_use]
pub fn handler(&self) -> TypedIntoHandler<T> {
TypedIntoHandler::new(Self {
id: self.id,
messages: self.messages.clone(),
})
}
pub fn clear(&self) {
self.messages.borrow_mut().clear();
}
}
impl<T: 'static> IntoHandler<T> for TypedIntoMessageSavingHandler<T> {
fn id(&self) -> Ustr {
self.id
}
fn handle(&self, message: T) {
self.messages.borrow_mut().push(message);
}
}
#[must_use]
pub fn get_typed_into_message_saving_handler<T: 'static>(
id: Option<Ustr>,
) -> (TypedIntoHandler<T>, TypedIntoMessageSavingHandler<T>) {
let saving_handler = TypedIntoMessageSavingHandler::new(id);
let typed_handler = saving_handler.handler();
(typed_handler, saving_handler)
}
thread_local! {
static SAVING_HANDLERS: RefCell<AHashMap<Ustr, Box<dyn std::any::Any>>> = RefCell::new(AHashMap::new());
}
#[must_use]
pub fn get_message_saving_handler<T: Clone + 'static>(id: Option<Ustr>) -> ShareableMessageHandler {
let (handler, saver) = get_any_saving_handler::<T>(id);
let handler_id = handler.0.id();
SAVING_HANDLERS.with(|handlers| {
handlers.borrow_mut().insert(handler_id, Box::new(saver));
});
handler
}
#[must_use]
pub fn get_saved_messages<T: Clone + 'static>(handler: &ShareableMessageHandler) -> Vec<T> {
let handler_id = handler.0.id();
SAVING_HANDLERS.with(|handlers| {
let handlers = handlers.borrow();
if let Some(saver) = handlers.get(&handler_id)
&& let Some(saver) = saver.downcast_ref::<AnySavingHandler<T>>()
{
return saver.get_messages();
}
Vec::new()
})
}