#![allow(unsafe_code)]
use std::{
cell::{RefCell, UnsafeCell},
fmt::Debug,
rc::Rc,
};
use ahash::{AHashMap, AHashSet};
use nautilus_model::identifiers::{ComponentId, TraderId};
use ustr::Ustr;
use crate::{
actor::{Actor, registry::get_actor_registry},
cache::Cache,
clock::Clock,
enums::{ComponentState, ComponentTrigger},
};
pub trait Component {
fn component_id(&self) -> ComponentId;
fn state(&self) -> ComponentState;
fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()>;
fn is_ready(&self) -> bool {
self.state() == ComponentState::Ready
}
fn not_running(&self) -> bool {
!self.is_running()
}
fn is_running(&self) -> bool {
self.state() == ComponentState::Running
}
fn is_stopped(&self) -> bool {
self.state() == ComponentState::Stopped
}
fn is_degraded(&self) -> bool {
self.state() == ComponentState::Degraded
}
fn is_faulted(&self) -> bool {
self.state() == ComponentState::Faulted
}
fn is_disposed(&self) -> bool {
self.state() == ComponentState::Disposed
}
fn register(
&mut self,
trader_id: TraderId,
clock: Rc<RefCell<dyn Clock>>,
cache: Rc<RefCell<Cache>>,
) -> anyhow::Result<()>;
fn initialize(&mut self) -> anyhow::Result<()> {
self.transition_state(ComponentTrigger::Initialize)
}
fn start(&mut self) -> anyhow::Result<()> {
self.transition_state(ComponentTrigger::Start)?;
if let Err(e) = self.on_start() {
log_error(&e);
return Err(e); }
self.transition_state(ComponentTrigger::StartCompleted)?;
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
self.transition_state(ComponentTrigger::Stop)?;
if let Err(e) = self.on_stop() {
log_error(&e);
return Err(e); }
self.transition_state(ComponentTrigger::StopCompleted)?;
Ok(())
}
fn resume(&mut self) -> anyhow::Result<()> {
self.transition_state(ComponentTrigger::Resume)?;
if let Err(e) = self.on_resume() {
log_error(&e);
return Err(e); }
self.transition_state(ComponentTrigger::ResumeCompleted)?;
Ok(())
}
fn degrade(&mut self) -> anyhow::Result<()> {
self.transition_state(ComponentTrigger::Degrade)?;
if let Err(e) = self.on_degrade() {
log_error(&e);
return Err(e); }
self.transition_state(ComponentTrigger::DegradeCompleted)?;
Ok(())
}
fn fault(&mut self) -> anyhow::Result<()> {
self.transition_state(ComponentTrigger::Fault)?;
if let Err(e) = self.on_fault() {
log_error(&e);
return Err(e); }
self.transition_state(ComponentTrigger::FaultCompleted)?;
Ok(())
}
fn reset(&mut self) -> anyhow::Result<()> {
self.transition_state(ComponentTrigger::Reset)?;
if let Err(e) = self.on_reset() {
log_error(&e);
return Err(e); }
self.transition_state(ComponentTrigger::ResetCompleted)?;
Ok(())
}
fn dispose(&mut self) -> anyhow::Result<()> {
self.transition_state(ComponentTrigger::Dispose)?;
if let Err(e) = self.on_dispose() {
log_error(&e);
return Err(e); }
self.transition_state(ComponentTrigger::DisposeCompleted)?;
Ok(())
}
fn on_start(&mut self) -> anyhow::Result<()> {
log::warn!(
"The `on_start` handler was called when not overridden, \
it's expected that any actions required when stopping the component \
occur here, such as unsubscribing from data",
);
Ok(())
}
fn on_stop(&mut self) -> anyhow::Result<()> {
log::warn!(
"The `on_stop` handler was called when not overridden, \
it's expected that any actions required when stopping the component \
occur here, such as unsubscribing from data",
);
Ok(())
}
fn on_resume(&mut self) -> anyhow::Result<()> {
log::warn!(
"The `on_resume` handler was called when not overridden, \
it's expected that any actions required when resuming the component \
following a stop occur here"
);
Ok(())
}
fn on_reset(&mut self) -> anyhow::Result<()> {
log::warn!(
"The `on_reset` handler was called when not overridden, \
it's expected that any actions required when resetting the component \
occur here, such as resetting indicators and other state"
);
Ok(())
}
fn on_dispose(&mut self) -> anyhow::Result<()> {
Ok(())
}
fn on_degrade(&mut self) -> anyhow::Result<()> {
Ok(())
}
fn on_fault(&mut self) -> anyhow::Result<()> {
Ok(())
}
}
fn log_error(e: &anyhow::Error) {
log::error!("{e}");
}
#[rustfmt::skip]
impl ComponentState {
pub fn transition(&mut self, trigger: &ComponentTrigger) -> anyhow::Result<Self> {
let new_state = match (&self, trigger) {
(Self::PreInitialized, ComponentTrigger::Initialize) => Self::Ready,
(Self::Ready, ComponentTrigger::Reset) => Self::Resetting,
(Self::Ready, ComponentTrigger::Start) => Self::Starting,
(Self::Ready, ComponentTrigger::Dispose) => Self::Disposing,
(Self::Resetting, ComponentTrigger::ResetCompleted) => Self::Ready,
(Self::Starting, ComponentTrigger::StartCompleted) => Self::Running,
(Self::Starting, ComponentTrigger::Stop) => Self::Stopping,
(Self::Starting, ComponentTrigger::Fault) => Self::Faulting,
(Self::Running, ComponentTrigger::Stop) => Self::Stopping,
(Self::Running, ComponentTrigger::Degrade) => Self::Degrading,
(Self::Running, ComponentTrigger::Fault) => Self::Faulting,
(Self::Resuming, ComponentTrigger::Stop) => Self::Stopping,
(Self::Resuming, ComponentTrigger::ResumeCompleted) => Self::Running,
(Self::Resuming, ComponentTrigger::Fault) => Self::Faulting,
(Self::Stopping, ComponentTrigger::StopCompleted) => Self::Stopped,
(Self::Stopping, ComponentTrigger::Fault) => Self::Faulting,
(Self::Stopped, ComponentTrigger::Reset) => Self::Resetting,
(Self::Stopped, ComponentTrigger::Resume) => Self::Resuming,
(Self::Stopped, ComponentTrigger::Dispose) => Self::Disposing,
(Self::Stopped, ComponentTrigger::Fault) => Self::Faulting,
(Self::Degrading, ComponentTrigger::DegradeCompleted) => Self::Degraded,
(Self::Degraded, ComponentTrigger::Resume) => Self::Resuming,
(Self::Degraded, ComponentTrigger::Stop) => Self::Stopping,
(Self::Degraded, ComponentTrigger::Fault) => Self::Faulting,
(Self::Disposing, ComponentTrigger::DisposeCompleted) => Self::Disposed,
(Self::Faulting, ComponentTrigger::FaultCompleted) => Self::Faulted,
_ => anyhow::bail!("Invalid state trigger {self} -> {trigger}"),
};
Ok(new_state)
}
}
thread_local! {
static COMPONENT_REGISTRY: ComponentRegistry = ComponentRegistry::new();
}
pub struct ComponentRegistry {
components: RefCell<AHashMap<Ustr, Rc<UnsafeCell<dyn Component>>>>,
borrows: RefCell<AHashSet<Ustr>>,
}
impl Debug for ComponentRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let components_ref = self.components.borrow();
let keys: Vec<&Ustr> = components_ref.keys().collect();
f.debug_struct(stringify!(ComponentRegistry))
.field("components", &keys)
.field("active_borrows", &self.borrows.borrow().len())
.finish()
}
}
impl Default for ComponentRegistry {
fn default() -> Self {
Self::new()
}
}
impl ComponentRegistry {
pub fn new() -> Self {
Self {
components: RefCell::new(AHashMap::new()),
borrows: RefCell::new(AHashSet::new()),
}
}
pub fn insert(&self, id: Ustr, component: Rc<UnsafeCell<dyn Component>>) {
self.components.borrow_mut().insert(id, component);
}
pub fn get(&self, id: &Ustr) -> Option<Rc<UnsafeCell<dyn Component>>> {
self.components.borrow().get(id).cloned()
}
pub fn is_borrowed(&self, id: &Ustr) -> bool {
self.borrows.borrow().contains(id)
}
fn try_borrow(&self, id: Ustr) -> bool {
let mut borrows = self.borrows.borrow_mut();
if borrows.contains(&id) {
false
} else {
borrows.insert(id);
true
}
}
fn release_borrow(&self, id: &Ustr) {
self.borrows.borrow_mut().remove(id);
}
}
struct BorrowGuard {
id: Ustr,
}
impl BorrowGuard {
fn new(id: Ustr) -> Self {
Self { id }
}
}
impl Drop for BorrowGuard {
fn drop(&mut self) {
get_component_registry().release_borrow(&self.id);
}
}
pub fn get_component_registry() -> &'static ComponentRegistry {
COMPONENT_REGISTRY.with(|registry| unsafe {
std::mem::transmute::<&ComponentRegistry, &'static ComponentRegistry>(registry)
})
}
pub fn register_component<T>(component: T) -> Rc<UnsafeCell<T>>
where
T: Component + 'static,
{
let component_id = component.component_id().inner();
let component_ref = Rc::new(UnsafeCell::new(component));
let component_trait_ref: Rc<UnsafeCell<dyn Component>> = component_ref.clone();
get_component_registry().insert(component_id, component_trait_ref);
component_ref
}
pub fn register_component_actor<T>(component: T) -> Rc<UnsafeCell<T>>
where
T: Component + Actor + 'static,
{
let component_id = component.component_id().inner();
let actor_id = component.id();
let component_ref = Rc::new(UnsafeCell::new(component));
let component_trait_ref: Rc<UnsafeCell<dyn Component>> = component_ref.clone();
get_component_registry().insert(component_id, component_trait_ref);
let actor_trait_ref: Rc<UnsafeCell<dyn Actor>> = component_ref.clone();
get_actor_registry().insert(actor_id, actor_trait_ref);
component_ref
}
pub fn start_component(id: &Ustr) -> anyhow::Result<()> {
let registry = get_component_registry();
let component_ref = registry
.get(id)
.ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in global registry"))?;
if !registry.try_borrow(*id) {
anyhow::bail!(
"Component '{id}' is already mutably borrowed. \
This would create aliasing mutable references (undefined behavior)."
);
}
let _guard = BorrowGuard::new(*id);
unsafe {
let component = &mut *component_ref.get();
component.start()
}
}
pub fn stop_component(id: &Ustr) -> anyhow::Result<()> {
let registry = get_component_registry();
let component_ref = registry
.get(id)
.ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in global registry"))?;
if !registry.try_borrow(*id) {
anyhow::bail!(
"Component '{id}' is already mutably borrowed. \
This would create aliasing mutable references (undefined behavior)."
);
}
let _guard = BorrowGuard::new(*id);
unsafe {
let component = &mut *component_ref.get();
component.stop()
}
}
pub fn reset_component(id: &Ustr) -> anyhow::Result<()> {
let registry = get_component_registry();
let component_ref = registry
.get(id)
.ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in global registry"))?;
if !registry.try_borrow(*id) {
anyhow::bail!(
"Component '{id}' is already mutably borrowed. \
This would create aliasing mutable references (undefined behavior)."
);
}
let _guard = BorrowGuard::new(*id);
unsafe {
let component = &mut *component_ref.get();
component.reset()
}
}
pub fn dispose_component(id: &Ustr) -> anyhow::Result<()> {
let registry = get_component_registry();
let component_ref = registry
.get(id)
.ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in global registry"))?;
if !registry.try_borrow(*id) {
anyhow::bail!(
"Component '{id}' is already mutably borrowed. \
This would create aliasing mutable references (undefined behavior)."
);
}
let _guard = BorrowGuard::new(*id);
unsafe {
let component = &mut *component_ref.get();
component.dispose()
}
}
pub fn get_component(id: &Ustr) -> Option<Rc<UnsafeCell<dyn Component>>> {
get_component_registry().get(id)
}
#[cfg(test)]
pub fn clear_component_registry() {
let registry = get_component_registry();
registry.components.borrow_mut().clear();
registry.borrows.borrow_mut().clear();
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicBool, Ordering};
use rstest::rstest;
use super::*;
struct TestComponent {
id: ComponentId,
state: ComponentState,
should_panic: &'static AtomicBool,
}
impl TestComponent {
fn new(name: &str, should_panic: &'static AtomicBool) -> Self {
Self {
id: ComponentId::new(name),
state: ComponentState::Ready,
should_panic,
}
}
}
impl Component for TestComponent {
fn component_id(&self) -> ComponentId {
self.id
}
fn state(&self) -> ComponentState {
self.state
}
fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
self.state = self.state.transition(&trigger)?;
Ok(())
}
fn register(
&mut self,
_trader_id: TraderId,
_clock: Rc<RefCell<dyn Clock>>,
_cache: Rc<RefCell<Cache>>,
) -> anyhow::Result<()> {
Ok(())
}
#[expect(clippy::panic_in_result_fn)] fn on_start(&mut self) -> anyhow::Result<()> {
assert!(
!self.should_panic.load(Ordering::SeqCst),
"Intentional panic for testing"
);
Ok(())
}
}
static NO_PANIC: AtomicBool = AtomicBool::new(false);
static DO_PANIC: AtomicBool = AtomicBool::new(true);
#[rstest]
fn test_component_borrow_tracking_prevents_double_borrow() {
clear_component_registry();
let id = Ustr::from("test-component-1");
let component = TestComponent::new("test-component-1", &NO_PANIC);
let component_id = component.id.inner();
let component_ref = Rc::new(UnsafeCell::new(component));
get_component_registry().insert(component_id, component_ref);
let result1 = start_component(&id);
assert!(result1.is_ok());
let result2 = stop_component(&id);
assert!(result2.is_ok());
}
#[rstest]
fn test_component_borrow_released_after_lifecycle_call() {
clear_component_registry();
let id = Ustr::from("test-component-2");
let component = TestComponent::new("test-component-2", &NO_PANIC);
let component_id = component.id.inner();
let component_ref = Rc::new(UnsafeCell::new(component));
get_component_registry().insert(component_id, component_ref);
let _ = start_component(&id);
assert!(!get_component_registry().is_borrowed(&id));
}
#[rstest]
fn test_component_borrow_released_on_panic() {
clear_component_registry();
let id = Ustr::from("test-component-panic");
let component = TestComponent::new("test-component-panic", &DO_PANIC);
let component_id = component.id.inner();
let component_ref = Rc::new(UnsafeCell::new(component));
get_component_registry().insert(component_id, component_ref);
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _ = start_component(&id);
}));
assert!(result.is_err(), "Expected panic from on_start");
assert!(
!get_component_registry().is_borrowed(&id),
"Borrow was not released after panic"
);
}
}