1#![allow(unsafe_code)]
23
24use std::{
25 cell::{RefCell, UnsafeCell},
26 fmt::Debug,
27 rc::Rc,
28};
29
30use ahash::{AHashMap, AHashSet};
31use nautilus_model::identifiers::{ComponentId, TraderId};
32use ustr::Ustr;
33
34use crate::{
35 actor::{Actor, registry::with_actor_registry},
36 cache::Cache,
37 clock::Clock,
38 enums::{ComponentState, ComponentTrigger},
39};
40
41pub trait Component {
43 fn component_id(&self) -> ComponentId;
45
46 fn state(&self) -> ComponentState;
48
49 fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()>;
55
56 fn is_ready(&self) -> bool {
58 self.state() == ComponentState::Ready
59 }
60
61 fn not_running(&self) -> bool {
63 !self.is_running()
64 }
65
66 fn is_running(&self) -> bool {
68 self.state() == ComponentState::Running
69 }
70
71 fn is_stopped(&self) -> bool {
73 self.state() == ComponentState::Stopped
74 }
75
76 fn is_degraded(&self) -> bool {
78 self.state() == ComponentState::Degraded
79 }
80
81 fn is_faulted(&self) -> bool {
83 self.state() == ComponentState::Faulted
84 }
85
86 fn is_disposed(&self) -> bool {
88 self.state() == ComponentState::Disposed
89 }
90
91 fn register(
97 &mut self,
98 trader_id: TraderId,
99 clock: Rc<RefCell<dyn Clock>>,
100 cache: Rc<RefCell<Cache>>,
101 ) -> anyhow::Result<()>;
102
103 fn initialize(&mut self) -> anyhow::Result<()> {
109 self.transition_state(ComponentTrigger::Initialize)
110 }
111
112 fn start(&mut self) -> anyhow::Result<()> {
118 self.transition_state(ComponentTrigger::Start)?; if let Err(e) = self.on_start() {
121 log_error(self.component_id(), &e);
122 return Err(e); }
124
125 self.transition_state(ComponentTrigger::StartCompleted)?;
126
127 Ok(())
128 }
129
130 fn stop(&mut self) -> anyhow::Result<()> {
136 self.transition_state(ComponentTrigger::Stop)?; if let Err(e) = self.on_stop() {
139 log_error(self.component_id(), &e);
140 return Err(e); }
142
143 self.transition_state(ComponentTrigger::StopCompleted)?;
144
145 Ok(())
146 }
147
148 fn resume(&mut self) -> anyhow::Result<()> {
154 self.transition_state(ComponentTrigger::Resume)?; if let Err(e) = self.on_resume() {
157 log_error(self.component_id(), &e);
158 return Err(e); }
160
161 self.transition_state(ComponentTrigger::ResumeCompleted)?;
162
163 Ok(())
164 }
165
166 fn degrade(&mut self) -> anyhow::Result<()> {
172 self.transition_state(ComponentTrigger::Degrade)?; if let Err(e) = self.on_degrade() {
175 log_error(self.component_id(), &e);
176 return Err(e); }
178
179 self.transition_state(ComponentTrigger::DegradeCompleted)?;
180
181 Ok(())
182 }
183
184 fn fault(&mut self) -> anyhow::Result<()> {
190 self.transition_state(ComponentTrigger::Fault)?; if let Err(e) = self.on_fault() {
193 log_error(self.component_id(), &e);
194 return Err(e); }
196
197 self.transition_state(ComponentTrigger::FaultCompleted)?;
198
199 Ok(())
200 }
201
202 fn reset(&mut self) -> anyhow::Result<()> {
208 self.transition_state(ComponentTrigger::Reset)?; if let Err(e) = self.on_reset() {
211 log_error(self.component_id(), &e);
212 return Err(e); }
214
215 self.transition_state(ComponentTrigger::ResetCompleted)?;
216
217 Ok(())
218 }
219
220 fn dispose(&mut self) -> anyhow::Result<()> {
226 self.transition_state(ComponentTrigger::Dispose)?; if let Err(e) = self.on_dispose() {
229 log_error(self.component_id(), &e);
230 return Err(e); }
232
233 self.transition_state(ComponentTrigger::DisposeCompleted)?;
234
235 Ok(())
236 }
237
238 fn on_start(&mut self) -> anyhow::Result<()> {
244 log::warn!(
245 "The `on_start` handler was called when not overridden, \
246 it's expected that any actions required when stopping the component \
247 occur here, such as unsubscribing from data",
248 );
249 Ok(())
250 }
251
252 fn on_stop(&mut self) -> anyhow::Result<()> {
258 log::warn!(
259 "The `on_stop` handler was called when not overridden, \
260 it's expected that any actions required when stopping the component \
261 occur here, such as unsubscribing from data",
262 );
263 Ok(())
264 }
265
266 fn on_resume(&mut self) -> anyhow::Result<()> {
272 log::warn!(
273 "The `on_resume` handler was called when not overridden, \
274 it's expected that any actions required when resuming the component \
275 following a stop occur here"
276 );
277 Ok(())
278 }
279
280 fn on_reset(&mut self) -> anyhow::Result<()> {
286 log::warn!(
287 "The `on_reset` handler was called when not overridden, \
288 it's expected that any actions required when resetting the component \
289 occur here, such as resetting indicators and other state"
290 );
291 Ok(())
292 }
293
294 fn on_dispose(&mut self) -> anyhow::Result<()> {
300 Ok(())
301 }
302
303 fn on_degrade(&mut self) -> anyhow::Result<()> {
309 Ok(())
310 }
311
312 fn on_fault(&mut self) -> anyhow::Result<()> {
318 Ok(())
319 }
320}
321
322fn log_error(component: ComponentId, e: &anyhow::Error) {
323 log::error!(component = component.as_str(); "{e}");
324}
325
326#[rustfmt::skip]
327impl ComponentState {
328 pub fn transition(&mut self, trigger: &ComponentTrigger) -> anyhow::Result<Self> {
334 let new_state = match (&self, trigger) {
335 (Self::PreInitialized, ComponentTrigger::Initialize) => Self::Ready,
336 (Self::Ready, ComponentTrigger::Reset) => Self::Resetting,
337 (Self::Ready, ComponentTrigger::Start) => Self::Starting,
338 (Self::Ready, ComponentTrigger::Dispose) => Self::Disposing,
339 (Self::Resetting, ComponentTrigger::ResetCompleted) => Self::Ready,
340 (Self::Starting, ComponentTrigger::StartCompleted) => Self::Running,
341 (Self::Starting, ComponentTrigger::Stop) => Self::Stopping,
342 (Self::Starting, ComponentTrigger::Fault) => Self::Faulting,
343 (Self::Running, ComponentTrigger::Stop) => Self::Stopping,
344 (Self::Running, ComponentTrigger::Degrade) => Self::Degrading,
345 (Self::Running, ComponentTrigger::Fault) => Self::Faulting,
346 (Self::Resuming, ComponentTrigger::Stop) => Self::Stopping,
347 (Self::Resuming, ComponentTrigger::ResumeCompleted) => Self::Running,
348 (Self::Resuming, ComponentTrigger::Fault) => Self::Faulting,
349 (Self::Stopping, ComponentTrigger::StopCompleted) => Self::Stopped,
350 (Self::Stopping, ComponentTrigger::Fault) => Self::Faulting,
351 (Self::Stopped, ComponentTrigger::Reset) => Self::Resetting,
352 (Self::Stopped, ComponentTrigger::Resume) => Self::Resuming,
353 (Self::Stopped, ComponentTrigger::Dispose) => Self::Disposing,
354 (Self::Stopped, ComponentTrigger::Fault) => Self::Faulting,
355 (Self::Degrading, ComponentTrigger::DegradeCompleted) => Self::Degraded,
356 (Self::Degraded, ComponentTrigger::Resume) => Self::Resuming,
357 (Self::Degraded, ComponentTrigger::Stop) => Self::Stopping,
358 (Self::Degraded, ComponentTrigger::Fault) => Self::Faulting,
359 (Self::Disposing, ComponentTrigger::DisposeCompleted) => Self::Disposed,
360 (Self::Faulting, ComponentTrigger::FaultCompleted) => Self::Faulted,
361 _ => anyhow::bail!("Invalid state trigger {self} -> {trigger}"),
362 };
363 Ok(new_state)
364 }
365}
366
367thread_local! {
368 static COMPONENT_REGISTRY: ComponentRegistry = ComponentRegistry::new();
369}
370
371pub struct ComponentRegistry {
376 components: RefCell<AHashMap<Ustr, Rc<UnsafeCell<dyn Component>>>>,
377 borrows: RefCell<AHashSet<Ustr>>,
378}
379
380impl Debug for ComponentRegistry {
381 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
382 let components_ref = self.components.borrow();
383 let keys: Vec<&Ustr> = components_ref.keys().collect();
384 f.debug_struct(stringify!(ComponentRegistry))
385 .field("components", &keys)
386 .field("active_borrows", &self.borrows.borrow().len())
387 .finish()
388 }
389}
390
391impl Default for ComponentRegistry {
392 fn default() -> Self {
393 Self::new()
394 }
395}
396
397impl ComponentRegistry {
398 pub fn new() -> Self {
399 Self {
400 components: RefCell::new(AHashMap::new()),
401 borrows: RefCell::new(AHashSet::new()),
402 }
403 }
404
405 pub fn insert(&self, id: Ustr, component: Rc<UnsafeCell<dyn Component>>) {
406 self.components.borrow_mut().insert(id, component);
407 }
408
409 pub fn get(&self, id: &Ustr) -> Option<Rc<UnsafeCell<dyn Component>>> {
410 self.components.borrow().get(id).cloned()
411 }
412
413 pub fn is_borrowed(&self, id: &Ustr) -> bool {
415 self.borrows.borrow().contains(id)
416 }
417
418 fn try_borrow(&self, id: Ustr) -> bool {
420 let mut borrows = self.borrows.borrow_mut();
421 if borrows.contains(&id) {
422 false
423 } else {
424 borrows.insert(id);
425 true
426 }
427 }
428
429 fn release_borrow(&self, id: &Ustr) {
431 self.borrows.borrow_mut().remove(id);
432 }
433}
434
435struct BorrowGuard {
440 id: Ustr,
441}
442
443impl BorrowGuard {
444 fn new(id: Ustr) -> Self {
445 Self { id }
446 }
447}
448
449impl Drop for BorrowGuard {
450 fn drop(&mut self) {
451 with_component_registry(|registry| registry.release_borrow(&self.id));
452 }
453}
454
455pub fn with_component_registry<R>(f: impl FnOnce(&ComponentRegistry) -> R) -> R {
456 COMPONENT_REGISTRY.with(f)
457}
458
459pub fn register_component<T>(component: T) -> Rc<UnsafeCell<T>>
461where
462 T: Component + 'static,
463{
464 let component_id = component.component_id().inner();
465 let component_ref = Rc::new(UnsafeCell::new(component));
466
467 let component_trait_ref: Rc<UnsafeCell<dyn Component>> = component_ref.clone();
469 with_component_registry(|registry| registry.insert(component_id, component_trait_ref));
470
471 component_ref
472}
473
474pub fn register_component_actor<T>(component: T) -> Rc<UnsafeCell<T>>
476where
477 T: Component + Actor + 'static,
478{
479 let component_id = component.component_id().inner();
480 let actor_id = component.id();
481 let component_ref = Rc::new(UnsafeCell::new(component));
482
483 let component_trait_ref: Rc<UnsafeCell<dyn Component>> = component_ref.clone();
485 with_component_registry(|registry| registry.insert(component_id, component_trait_ref));
486
487 let actor_trait_ref: Rc<UnsafeCell<dyn Actor>> = component_ref.clone();
489 with_actor_registry(|registry| registry.insert(actor_id, actor_trait_ref));
490
491 component_ref
492}
493
494pub fn start_component(id: &Ustr) -> anyhow::Result<()> {
502 let component_ref = with_component_registry(|registry| {
503 let component_ref = registry
504 .get(id)
505 .ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in global registry"))?;
506
507 if !registry.try_borrow(*id) {
508 anyhow::bail!(
509 "Component '{id}' is already mutably borrowed. \
510 This would create aliasing mutable references (undefined behavior)."
511 );
512 }
513
514 Ok::<_, anyhow::Error>(component_ref)
515 })?;
516
517 let _guard = BorrowGuard::new(*id);
518
519 unsafe {
521 let component = &mut *component_ref.get();
522 component.start()
523 }
524}
525
526pub fn stop_component(id: &Ustr) -> anyhow::Result<()> {
534 let component_ref = with_component_registry(|registry| {
535 let component_ref = registry
536 .get(id)
537 .ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in global registry"))?;
538
539 if !registry.try_borrow(*id) {
540 anyhow::bail!(
541 "Component '{id}' is already mutably borrowed. \
542 This would create aliasing mutable references (undefined behavior)."
543 );
544 }
545
546 Ok::<_, anyhow::Error>(component_ref)
547 })?;
548
549 let _guard = BorrowGuard::new(*id);
550
551 unsafe {
553 let component = &mut *component_ref.get();
554 component.stop()
555 }
556}
557
558pub fn reset_component(id: &Ustr) -> anyhow::Result<()> {
566 let component_ref = with_component_registry(|registry| {
567 let component_ref = registry
568 .get(id)
569 .ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in global registry"))?;
570
571 if !registry.try_borrow(*id) {
572 anyhow::bail!(
573 "Component '{id}' is already mutably borrowed. \
574 This would create aliasing mutable references (undefined behavior)."
575 );
576 }
577
578 Ok::<_, anyhow::Error>(component_ref)
579 })?;
580
581 let _guard = BorrowGuard::new(*id);
582
583 unsafe {
585 let component = &mut *component_ref.get();
586 component.reset()
587 }
588}
589
590pub fn dispose_component(id: &Ustr) -> anyhow::Result<()> {
598 let component_ref = with_component_registry(|registry| {
599 let component_ref = registry
600 .get(id)
601 .ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in global registry"))?;
602
603 if !registry.try_borrow(*id) {
604 anyhow::bail!(
605 "Component '{id}' is already mutably borrowed. \
606 This would create aliasing mutable references (undefined behavior)."
607 );
608 }
609
610 Ok::<_, anyhow::Error>(component_ref)
611 })?;
612
613 let _guard = BorrowGuard::new(*id);
614
615 unsafe {
617 let component = &mut *component_ref.get();
618 component.dispose()
619 }
620}
621
622pub fn get_component(id: &Ustr) -> Option<Rc<UnsafeCell<dyn Component>>> {
624 with_component_registry(|registry| registry.get(id))
625}
626
627#[cfg(test)]
628pub fn clear_component_registry() {
630 with_component_registry(|registry| {
631 registry.components.borrow_mut().clear();
632 registry.borrows.borrow_mut().clear();
633 });
634}
635
636#[cfg(test)]
637mod tests {
638 use std::{
639 any::Any,
640 sync::atomic::{AtomicBool, Ordering},
641 };
642
643 use rstest::rstest;
644
645 use super::*;
646
647 #[derive(Debug)]
648 struct TestComponent {
649 id: ComponentId,
650 state: ComponentState,
651 should_panic: &'static AtomicBool,
652 }
653
654 impl TestComponent {
655 fn new(name: &str, should_panic: &'static AtomicBool) -> Self {
656 Self {
657 id: ComponentId::new(name),
658 state: ComponentState::Ready,
659 should_panic,
660 }
661 }
662 }
663
664 impl Actor for TestComponent {
665 fn id(&self) -> Ustr {
666 self.id.inner()
667 }
668
669 fn handle(&mut self, _msg: &dyn Any) {}
670
671 fn as_any(&self) -> &dyn Any {
672 self
673 }
674 }
675
676 impl Component for TestComponent {
677 fn component_id(&self) -> ComponentId {
678 self.id
679 }
680
681 fn state(&self) -> ComponentState {
682 self.state
683 }
684
685 fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
686 self.state = self.state.transition(&trigger)?;
687 Ok(())
688 }
689
690 fn register(
691 &mut self,
692 _trader_id: TraderId,
693 _clock: Rc<RefCell<dyn Clock>>,
694 _cache: Rc<RefCell<Cache>>,
695 ) -> anyhow::Result<()> {
696 Ok(())
697 }
698
699 #[expect(clippy::panic_in_result_fn)] fn on_start(&mut self) -> anyhow::Result<()> {
701 assert!(
702 !self.should_panic.load(Ordering::SeqCst),
703 "Intentional panic for testing"
704 );
705 Ok(())
706 }
707 }
708
709 static NO_PANIC: AtomicBool = AtomicBool::new(false);
710 static DO_PANIC: AtomicBool = AtomicBool::new(true);
711
712 #[rstest]
713 fn test_component_borrow_tracking_prevents_double_borrow() {
714 clear_component_registry();
715
716 let id = Ustr::from("test-component-1");
717 let component = TestComponent::new("test-component-1", &NO_PANIC);
718 let component_id = component.id.inner();
719
720 let component_ref = Rc::new(UnsafeCell::new(component));
721 with_component_registry(|registry| registry.insert(component_id, component_ref));
722
723 let result1 = start_component(&id);
725 assert!(result1.is_ok());
726
727 let result2 = stop_component(&id);
729 assert!(result2.is_ok());
730 }
731
732 #[rstest]
733 fn test_component_borrow_released_after_lifecycle_call() {
734 clear_component_registry();
735
736 let id = Ustr::from("test-component-2");
737 let component = TestComponent::new("test-component-2", &NO_PANIC);
738 let component_id = component.id.inner();
739
740 let component_ref = Rc::new(UnsafeCell::new(component));
741 with_component_registry(|registry| registry.insert(component_id, component_ref));
742
743 let _ = start_component(&id);
745
746 assert!(!with_component_registry(
748 |registry| registry.is_borrowed(&id)
749 ));
750 }
751
752 #[rstest]
753 fn test_component_borrow_released_on_panic() {
754 clear_component_registry();
755
756 let id = Ustr::from("test-component-panic");
757 let component = TestComponent::new("test-component-panic", &DO_PANIC);
758 let component_id = component.id.inner();
759
760 let component_ref = Rc::new(UnsafeCell::new(component));
761 with_component_registry(|registry| registry.insert(component_id, component_ref));
762
763 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
765 let _ = start_component(&id);
766 }));
767 assert!(result.is_err(), "Expected panic from on_start");
768
769 assert!(
771 !with_component_registry(|registry| registry.is_borrowed(&id)),
772 "Borrow was not released after panic"
773 );
774 }
775}