Skip to main content

nautilus_common/python/
actor.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
5//  You may not use this file except in compliance with the License.
6//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
7//  Unless required by applicable law or agreed to in writing, software
8//  distributed under the License is distributed on an "AS IS" BASIS,
9//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10//  See the License for the specific language governing permissions and
11//  limitations under the License.
12// -------------------------------------------------------------------------------------------------
13
14//! Python bindings for DataActor with complete command and event handler forwarding.
15
16use std::{
17    any::Any,
18    cell::{RefCell, UnsafeCell},
19    collections::HashMap,
20    fmt::Debug,
21    num::NonZeroUsize,
22    ops::{Deref, DerefMut},
23    rc::Rc,
24};
25
26use nautilus_core::{
27    from_pydict,
28    nanos::UnixNanos,
29    python::{IntoPyObjectNautilusExt, to_pyruntime_err, to_pyvalue_err},
30};
31#[cfg(feature = "defi")]
32use nautilus_model::defi::{
33    Block, Blockchain, Pool, PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolSwap,
34};
35use nautilus_model::{
36    data::{
37        Bar, BarType, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
38        MarkPriceUpdate, OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
39    },
40    enums::BookType,
41    identifiers::{ActorId, ClientId, InstrumentId, TraderId, Venue},
42    instruments::InstrumentAny,
43    orderbook::OrderBook,
44    python::instruments::instrument_any_to_pyobject,
45};
46use pyo3::{prelude::*, types::PyDict};
47
48use crate::{
49    actor::{
50        Actor, DataActor,
51        data_actor::{DataActorConfig, DataActorCore, ImportableActorConfig},
52        registry::{get_actor_registry, try_get_actor_unchecked},
53    },
54    cache::Cache,
55    clock::Clock,
56    component::{Component, get_component_registry},
57    enums::ComponentState,
58    python::{cache::PyCache, clock::PyClock, logging::PyLogger},
59    signal::Signal,
60    timer::{TimeEvent, TimeEventCallback},
61};
62
63#[pyo3::pymethods]
64impl DataActorConfig {
65    #[new]
66    #[pyo3(signature = (actor_id=None, log_events=true, log_commands=true))]
67    fn py_new(actor_id: Option<ActorId>, log_events: bool, log_commands: bool) -> Self {
68        Self {
69            actor_id,
70            log_events,
71            log_commands,
72        }
73    }
74}
75
76#[pyo3::pymethods]
77impl ImportableActorConfig {
78    #[new]
79    fn py_new(actor_path: String, config_path: String, config: Py<PyDict>) -> PyResult<Self> {
80        let json_config = Python::attach(|py| -> PyResult<HashMap<String, serde_json::Value>> {
81            let kwargs = PyDict::new(py);
82            kwargs.set_item("default", py.eval(pyo3::ffi::c_str!("str"), None, None)?)?;
83            let json_str: String = PyModule::import(py, "json")?
84                .call_method("dumps", (config.bind(py),), Some(&kwargs))?
85                .extract()?;
86
87            let json_value: serde_json::Value =
88                serde_json::from_str(&json_str).map_err(to_pyvalue_err)?;
89
90            if let serde_json::Value::Object(map) = json_value {
91                Ok(map.into_iter().collect())
92            } else {
93                Err(to_pyvalue_err("Config must be a dictionary"))
94            }
95        })?;
96
97        Ok(Self {
98            actor_path,
99            config_path,
100            config: json_config,
101        })
102    }
103
104    #[getter]
105    fn actor_path(&self) -> &String {
106        &self.actor_path
107    }
108
109    #[getter]
110    fn config_path(&self) -> &String {
111        &self.config_path
112    }
113
114    #[getter]
115    fn config(&self, py: Python<'_>) -> PyResult<Py<PyDict>> {
116        // Convert HashMap<String, serde_json::Value> back to Python dict
117        let py_dict = PyDict::new(py);
118        for (key, value) in &self.config {
119            // Convert serde_json::Value back to Python object via JSON
120            let json_str = serde_json::to_string(value).map_err(to_pyvalue_err)?;
121            let py_value = PyModule::import(py, "json")?.call_method("loads", (json_str,), None)?;
122            py_dict.set_item(key, py_value)?;
123        }
124        Ok(py_dict.unbind())
125    }
126}
127
128/// Inner state of PyDataActor, shared between Python wrapper and Rust registries.
129///
130/// This type holds the actual actor state and implements all the actor traits.
131/// It is wrapped in `Rc<UnsafeCell<>>` to allow shared ownership between Python
132/// and the global registries without copying.
133pub struct PyDataActorInner {
134    core: DataActorCore,
135    py_self: Option<Py<PyAny>>,
136    clock: PyClock,
137    logger: PyLogger,
138}
139
140impl Debug for PyDataActorInner {
141    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142        f.debug_struct(stringify!(PyDataActorInner))
143            .field("core", &self.core)
144            .field("py_self", &self.py_self.as_ref().map(|_| "<Py<PyAny>>"))
145            .field("clock", &self.clock)
146            .field("logger", &self.logger)
147            .finish()
148    }
149}
150
151impl Deref for PyDataActorInner {
152    type Target = DataActorCore;
153
154    fn deref(&self) -> &Self::Target {
155        &self.core
156    }
157}
158
159impl DerefMut for PyDataActorInner {
160    fn deref_mut(&mut self) -> &mut Self::Target {
161        &mut self.core
162    }
163}
164
165impl PyDataActorInner {
166    fn dispatch_on_start(&self) -> PyResult<()> {
167        if let Some(ref py_self) = self.py_self {
168            Python::attach(|py| py_self.call_method0(py, "on_start"))?;
169        }
170        Ok(())
171    }
172
173    fn dispatch_on_stop(&mut self) -> PyResult<()> {
174        if let Some(ref py_self) = self.py_self {
175            Python::attach(|py| py_self.call_method0(py, "on_stop"))?;
176        }
177        Ok(())
178    }
179
180    fn dispatch_on_resume(&mut self) -> PyResult<()> {
181        if let Some(ref py_self) = self.py_self {
182            Python::attach(|py| py_self.call_method0(py, "on_resume"))?;
183        }
184        Ok(())
185    }
186
187    fn dispatch_on_reset(&mut self) -> PyResult<()> {
188        if let Some(ref py_self) = self.py_self {
189            Python::attach(|py| py_self.call_method0(py, "on_reset"))?;
190        }
191        Ok(())
192    }
193
194    fn dispatch_on_dispose(&mut self) -> PyResult<()> {
195        if let Some(ref py_self) = self.py_self {
196            Python::attach(|py| py_self.call_method0(py, "on_dispose"))?;
197        }
198        Ok(())
199    }
200
201    fn dispatch_on_degrade(&mut self) -> PyResult<()> {
202        if let Some(ref py_self) = self.py_self {
203            Python::attach(|py| py_self.call_method0(py, "on_degrade"))?;
204        }
205        Ok(())
206    }
207
208    fn dispatch_on_fault(&mut self) -> PyResult<()> {
209        if let Some(ref py_self) = self.py_self {
210            Python::attach(|py| py_self.call_method0(py, "on_fault"))?;
211        }
212        Ok(())
213    }
214
215    fn dispatch_on_time_event(&mut self, event: TimeEvent) -> PyResult<()> {
216        if let Some(ref py_self) = self.py_self {
217            Python::attach(|py| {
218                py_self.call_method1(py, "on_time_event", (event.into_py_any_unwrap(py),))
219            })?;
220        }
221        Ok(())
222    }
223
224    fn dispatch_on_data(&mut self, data: Py<PyAny>) -> PyResult<()> {
225        if let Some(ref py_self) = self.py_self {
226            Python::attach(|py| py_self.call_method1(py, "on_data", (data,)))?;
227        }
228        Ok(())
229    }
230
231    fn dispatch_on_signal(&mut self, signal: &Signal) -> PyResult<()> {
232        if let Some(ref py_self) = self.py_self {
233            Python::attach(|py| {
234                py_self.call_method1(py, "on_signal", (signal.clone().into_py_any_unwrap(py),))
235            })?;
236        }
237        Ok(())
238    }
239
240    fn dispatch_on_instrument(&mut self, instrument: Py<PyAny>) -> PyResult<()> {
241        if let Some(ref py_self) = self.py_self {
242            Python::attach(|py| py_self.call_method1(py, "on_instrument", (instrument,)))?;
243        }
244        Ok(())
245    }
246
247    fn dispatch_on_quote(&mut self, quote: QuoteTick) -> PyResult<()> {
248        if let Some(ref py_self) = self.py_self {
249            Python::attach(|py| {
250                py_self.call_method1(py, "on_quote", (quote.into_py_any_unwrap(py),))
251            })?;
252        }
253        Ok(())
254    }
255
256    fn dispatch_on_trade(&mut self, trade: TradeTick) -> PyResult<()> {
257        if let Some(ref py_self) = self.py_self {
258            Python::attach(|py| {
259                py_self.call_method1(py, "on_trade", (trade.into_py_any_unwrap(py),))
260            })?;
261        }
262        Ok(())
263    }
264
265    fn dispatch_on_bar(&mut self, bar: Bar) -> PyResult<()> {
266        if let Some(ref py_self) = self.py_self {
267            Python::attach(|py| py_self.call_method1(py, "on_bar", (bar.into_py_any_unwrap(py),)))?;
268        }
269        Ok(())
270    }
271
272    fn dispatch_on_book_deltas(&mut self, deltas: OrderBookDeltas) -> PyResult<()> {
273        if let Some(ref py_self) = self.py_self {
274            Python::attach(|py| {
275                py_self.call_method1(py, "on_book_deltas", (deltas.into_py_any_unwrap(py),))
276            })?;
277        }
278        Ok(())
279    }
280
281    fn dispatch_on_book(&mut self, book: &OrderBook) -> PyResult<()> {
282        if let Some(ref py_self) = self.py_self {
283            Python::attach(|py| {
284                py_self.call_method1(py, "on_book", (book.clone().into_py_any_unwrap(py),))
285            })?;
286        }
287        Ok(())
288    }
289
290    fn dispatch_on_mark_price(&mut self, mark_price: MarkPriceUpdate) -> PyResult<()> {
291        if let Some(ref py_self) = self.py_self {
292            Python::attach(|py| {
293                py_self.call_method1(py, "on_mark_price", (mark_price.into_py_any_unwrap(py),))
294            })?;
295        }
296        Ok(())
297    }
298
299    fn dispatch_on_index_price(&mut self, index_price: IndexPriceUpdate) -> PyResult<()> {
300        if let Some(ref py_self) = self.py_self {
301            Python::attach(|py| {
302                py_self.call_method1(py, "on_index_price", (index_price.into_py_any_unwrap(py),))
303            })?;
304        }
305        Ok(())
306    }
307
308    fn dispatch_on_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> PyResult<()> {
309        if let Some(ref py_self) = self.py_self {
310            Python::attach(|py| {
311                py_self.call_method1(
312                    py,
313                    "on_funding_rate",
314                    (funding_rate.into_py_any_unwrap(py),),
315                )
316            })?;
317        }
318        Ok(())
319    }
320
321    fn dispatch_on_instrument_status(&mut self, data: InstrumentStatus) -> PyResult<()> {
322        if let Some(ref py_self) = self.py_self {
323            Python::attach(|py| {
324                py_self.call_method1(py, "on_instrument_status", (data.into_py_any_unwrap(py),))
325            })?;
326        }
327        Ok(())
328    }
329
330    fn dispatch_on_instrument_close(&mut self, update: InstrumentClose) -> PyResult<()> {
331        if let Some(ref py_self) = self.py_self {
332            Python::attach(|py| {
333                py_self.call_method1(py, "on_instrument_close", (update.into_py_any_unwrap(py),))
334            })?;
335        }
336        Ok(())
337    }
338
339    fn dispatch_on_historical_data(&mut self, data: Py<PyAny>) -> PyResult<()> {
340        if let Some(ref py_self) = self.py_self {
341            Python::attach(|py| py_self.call_method1(py, "on_historical_data", (data,)))?;
342        }
343        Ok(())
344    }
345
346    fn dispatch_on_historical_quotes(&mut self, quotes: Vec<QuoteTick>) -> PyResult<()> {
347        if let Some(ref py_self) = self.py_self {
348            Python::attach(|py| {
349                let py_quotes: Vec<_> = quotes
350                    .into_iter()
351                    .map(|q| q.into_py_any_unwrap(py))
352                    .collect();
353                py_self.call_method1(py, "on_historical_quotes", (py_quotes,))
354            })?;
355        }
356        Ok(())
357    }
358
359    fn dispatch_on_historical_trades(&mut self, trades: Vec<TradeTick>) -> PyResult<()> {
360        if let Some(ref py_self) = self.py_self {
361            Python::attach(|py| {
362                let py_trades: Vec<_> = trades
363                    .into_iter()
364                    .map(|t| t.into_py_any_unwrap(py))
365                    .collect();
366                py_self.call_method1(py, "on_historical_trades", (py_trades,))
367            })?;
368        }
369        Ok(())
370    }
371
372    fn dispatch_on_historical_bars(&mut self, bars: Vec<Bar>) -> PyResult<()> {
373        if let Some(ref py_self) = self.py_self {
374            Python::attach(|py| {
375                let py_bars: Vec<_> = bars.into_iter().map(|b| b.into_py_any_unwrap(py)).collect();
376                py_self.call_method1(py, "on_historical_bars", (py_bars,))
377            })?;
378        }
379        Ok(())
380    }
381
382    fn dispatch_on_historical_mark_prices(
383        &mut self,
384        mark_prices: Vec<MarkPriceUpdate>,
385    ) -> PyResult<()> {
386        if let Some(ref py_self) = self.py_self {
387            Python::attach(|py| {
388                let py_prices: Vec<_> = mark_prices
389                    .into_iter()
390                    .map(|p| p.into_py_any_unwrap(py))
391                    .collect();
392                py_self.call_method1(py, "on_historical_mark_prices", (py_prices,))
393            })?;
394        }
395        Ok(())
396    }
397
398    fn dispatch_on_historical_index_prices(
399        &mut self,
400        index_prices: Vec<IndexPriceUpdate>,
401    ) -> PyResult<()> {
402        if let Some(ref py_self) = self.py_self {
403            Python::attach(|py| {
404                let py_prices: Vec<_> = index_prices
405                    .into_iter()
406                    .map(|p| p.into_py_any_unwrap(py))
407                    .collect();
408                py_self.call_method1(py, "on_historical_index_prices", (py_prices,))
409            })?;
410        }
411        Ok(())
412    }
413
414    #[cfg(feature = "defi")]
415    fn dispatch_on_block(&mut self, block: Block) -> PyResult<()> {
416        if let Some(ref py_self) = self.py_self {
417            Python::attach(|py| {
418                py_self.call_method1(py, "on_block", (block.into_py_any_unwrap(py),))
419            })?;
420        }
421        Ok(())
422    }
423
424    #[cfg(feature = "defi")]
425    fn dispatch_on_pool(&mut self, pool: Pool) -> PyResult<()> {
426        if let Some(ref py_self) = self.py_self {
427            Python::attach(|py| {
428                py_self.call_method1(py, "on_pool", (pool.into_py_any_unwrap(py),))
429            })?;
430        }
431        Ok(())
432    }
433
434    #[cfg(feature = "defi")]
435    fn dispatch_on_pool_swap(&mut self, swap: PoolSwap) -> PyResult<()> {
436        if let Some(ref py_self) = self.py_self {
437            Python::attach(|py| {
438                py_self.call_method1(py, "on_pool_swap", (swap.into_py_any_unwrap(py),))
439            })?;
440        }
441        Ok(())
442    }
443
444    #[cfg(feature = "defi")]
445    fn dispatch_on_pool_liquidity_update(&mut self, update: PoolLiquidityUpdate) -> PyResult<()> {
446        if let Some(ref py_self) = self.py_self {
447            Python::attach(|py| {
448                py_self.call_method1(
449                    py,
450                    "on_pool_liquidity_update",
451                    (update.into_py_any_unwrap(py),),
452                )
453            })?;
454        }
455        Ok(())
456    }
457
458    #[cfg(feature = "defi")]
459    fn dispatch_on_pool_fee_collect(&mut self, collect: PoolFeeCollect) -> PyResult<()> {
460        if let Some(ref py_self) = self.py_self {
461            Python::attach(|py| {
462                py_self.call_method1(py, "on_pool_fee_collect", (collect.into_py_any_unwrap(py),))
463            })?;
464        }
465        Ok(())
466    }
467
468    #[cfg(feature = "defi")]
469    fn dispatch_on_pool_flash(&mut self, flash: PoolFlash) -> PyResult<()> {
470        if let Some(ref py_self) = self.py_self {
471            Python::attach(|py| {
472                py_self.call_method1(py, "on_pool_flash", (flash.into_py_any_unwrap(py),))
473            })?;
474        }
475        Ok(())
476    }
477}
478
479fn dict_to_params(
480    py: Python<'_>,
481    params: Option<Py<PyDict>>,
482) -> PyResult<Option<nautilus_core::Params>> {
483    match params {
484        Some(dict) => from_pydict(py, dict),
485        None => Ok(None),
486    }
487}
488
489/// Python-facing wrapper for DataActor.
490///
491/// This wrapper holds shared ownership of `PyDataActorInner` via `Rc<UnsafeCell<>>`.
492/// Both Python (through this wrapper) and the global registries share the same
493/// underlying actor instance, ensuring mutations are visible from both sides.
494#[allow(non_camel_case_types)]
495#[pyo3::pyclass(
496    module = "nautilus_trader.common",
497    name = "DataActor",
498    unsendable,
499    subclass
500)]
501pub struct PyDataActor {
502    inner: Rc<UnsafeCell<PyDataActorInner>>,
503}
504
505impl Debug for PyDataActor {
506    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
507        f.debug_struct(stringify!(PyDataActor))
508            .field("inner", &self.inner())
509            .finish()
510    }
511}
512
513impl PyDataActor {
514    /// Returns a reference to the inner actor state.
515    ///
516    /// # Safety
517    ///
518    /// This is safe for single-threaded use. The `UnsafeCell` allows interior
519    /// mutability which is required for the registries to mutate the actor.
520    #[inline]
521    #[allow(unsafe_code)]
522    pub(crate) fn inner(&self) -> &PyDataActorInner {
523        unsafe { &*self.inner.get() }
524    }
525
526    /// Returns a mutable reference to the inner actor state.
527    ///
528    /// # Safety
529    ///
530    /// This is safe for single-threaded use. Callers must ensure no aliasing
531    /// mutable references exist.
532    #[inline]
533    #[allow(unsafe_code, clippy::mut_from_ref)]
534    pub(crate) fn inner_mut(&self) -> &mut PyDataActorInner {
535        unsafe { &mut *self.inner.get() }
536    }
537}
538
539impl Deref for PyDataActor {
540    type Target = DataActorCore;
541
542    fn deref(&self) -> &Self::Target {
543        &self.inner().core
544    }
545}
546
547impl DerefMut for PyDataActor {
548    fn deref_mut(&mut self) -> &mut Self::Target {
549        &mut self.inner_mut().core
550    }
551}
552
553impl PyDataActor {
554    // Rust constructor for tests and direct Rust usage
555    pub fn new(config: Option<DataActorConfig>) -> Self {
556        let config = config.unwrap_or_default();
557        let core = DataActorCore::new(config);
558        let clock = PyClock::new_test(); // Temporary clock, will be updated on registration
559        let logger = PyLogger::new(core.actor_id().as_str());
560
561        let inner = PyDataActorInner {
562            core,
563            py_self: None,
564            clock,
565            logger,
566        };
567
568        Self {
569            inner: Rc::new(UnsafeCell::new(inner)),
570        }
571    }
572
573    /// Sets the Python instance reference for method dispatch.
574    ///
575    /// This enables the PyDataActor to forward method calls (like `on_start`, `on_stop`)
576    /// to the original Python instance that contains this PyDataActor. This is essential
577    /// for Python inheritance to work correctly, allowing Python subclasses to override
578    /// DataActor methods and have them called by the Rust system.
579    pub fn set_python_instance(&mut self, py_obj: Py<PyAny>) {
580        self.inner_mut().py_self = Some(py_obj);
581    }
582
583    /// Updates the actor_id in both the core config and the actor_id field.
584    ///
585    /// This method is only exposed for the Python actor to assist with configuration and should
586    /// **never** be called post registration. Calling this after registration will cause
587    /// inconsistent state where the actor is registered under one ID but its internal actor_id
588    /// field contains another, breaking message routing and lifecycle management.
589    pub fn set_actor_id(&mut self, actor_id: ActorId) {
590        let inner = self.inner_mut();
591        inner.core.config.actor_id = Some(actor_id);
592        inner.core.actor_id = actor_id;
593    }
594
595    /// Updates the log_events setting in the core config.
596    pub fn set_log_events(&mut self, log_events: bool) {
597        self.inner_mut().core.config.log_events = log_events;
598    }
599
600    /// Updates the log_commands setting in the core config.
601    pub fn set_log_commands(&mut self, log_commands: bool) {
602        self.inner_mut().core.config.log_commands = log_commands;
603    }
604
605    /// Returns the memory address of this instance as a hexadecimal string.
606    pub fn mem_address(&self) -> String {
607        self.inner().core.mem_address()
608    }
609
610    /// Returns a value indicating whether the actor has been registered with a trader.
611    pub fn is_registered(&self) -> bool {
612        self.inner().core.is_registered()
613    }
614
615    /// Register the actor with a trader.
616    ///
617    /// # Errors
618    ///
619    /// Returns an error if the actor is already registered or if the registration process fails.
620    pub fn register(
621        &mut self,
622        trader_id: TraderId,
623        clock: Rc<RefCell<dyn Clock>>,
624        cache: Rc<RefCell<Cache>>,
625    ) -> anyhow::Result<()> {
626        let inner = self.inner_mut();
627        inner.core.register(trader_id, clock, cache)?;
628
629        inner.clock = PyClock::from_rc(inner.core.clock_rc());
630
631        // Register default time event handler for this actor
632        let actor_id = inner.actor_id().inner();
633        let callback = TimeEventCallback::from(move |event: TimeEvent| {
634            if let Some(mut actor) = try_get_actor_unchecked::<PyDataActorInner>(&actor_id) {
635                if let Err(e) = actor.on_time_event(&event) {
636                    log::error!("Python time event handler failed for actor {actor_id}: {e}");
637                }
638            } else {
639                log::error!("Actor {actor_id} not found for time event handling");
640            }
641        });
642
643        inner.clock.inner_mut().register_default_handler(callback);
644
645        inner.initialize()
646    }
647
648    /// Registers this actor in the global component and actor registries.
649    ///
650    /// Clones the internal `Rc` and inserts into both registries. This ensures
651    /// Python and the registries share the exact same actor instance.
652    pub fn register_in_global_registries(&self) {
653        let inner = self.inner();
654        let component_id = inner.component_id().inner();
655        let actor_id = Actor::id(inner);
656
657        let inner_ref: Rc<UnsafeCell<PyDataActorInner>> = self.inner.clone();
658
659        let component_trait_ref: Rc<UnsafeCell<dyn Component>> = inner_ref.clone();
660        get_component_registry().insert(component_id, component_trait_ref);
661
662        let actor_trait_ref: Rc<UnsafeCell<dyn Actor>> = inner_ref;
663        get_actor_registry().insert(actor_id, actor_trait_ref);
664    }
665}
666
667impl DataActor for PyDataActorInner {
668    fn on_start(&mut self) -> anyhow::Result<()> {
669        self.dispatch_on_start()
670            .map_err(|e| anyhow::anyhow!("Python on_start failed: {e}"))
671    }
672
673    fn on_stop(&mut self) -> anyhow::Result<()> {
674        self.dispatch_on_stop()
675            .map_err(|e| anyhow::anyhow!("Python on_stop failed: {e}"))
676    }
677
678    fn on_resume(&mut self) -> anyhow::Result<()> {
679        self.dispatch_on_resume()
680            .map_err(|e| anyhow::anyhow!("Python on_resume failed: {e}"))
681    }
682
683    fn on_reset(&mut self) -> anyhow::Result<()> {
684        self.dispatch_on_reset()
685            .map_err(|e| anyhow::anyhow!("Python on_reset failed: {e}"))
686    }
687
688    fn on_dispose(&mut self) -> anyhow::Result<()> {
689        self.dispatch_on_dispose()
690            .map_err(|e| anyhow::anyhow!("Python on_dispose failed: {e}"))
691    }
692
693    fn on_degrade(&mut self) -> anyhow::Result<()> {
694        self.dispatch_on_degrade()
695            .map_err(|e| anyhow::anyhow!("Python on_degrade failed: {e}"))
696    }
697
698    fn on_fault(&mut self) -> anyhow::Result<()> {
699        self.dispatch_on_fault()
700            .map_err(|e| anyhow::anyhow!("Python on_fault failed: {e}"))
701    }
702
703    fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
704        self.dispatch_on_time_event(event.clone())
705            .map_err(|e| anyhow::anyhow!("Python on_time_event failed: {e}"))
706    }
707
708    #[allow(unused_variables)]
709    fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
710        Python::attach(|py| {
711            // TODO: Create a placeholder object since we can't easily convert &dyn Any to Py<PyAny>
712            // For now, we'll pass None and let Python subclasses handle specific data types
713            let py_data = py.None();
714
715            self.dispatch_on_data(py_data)
716                .map_err(|e| anyhow::anyhow!("Python on_data failed: {e}"))
717        })
718    }
719
720    fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
721        self.dispatch_on_signal(signal)
722            .map_err(|e| anyhow::anyhow!("Python on_signal failed: {e}"))
723    }
724
725    fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
726        Python::attach(|py| {
727            let py_instrument = instrument_any_to_pyobject(py, instrument.clone())
728                .map_err(|e| anyhow::anyhow!("Failed to convert InstrumentAny to Python: {e}"))?;
729            self.dispatch_on_instrument(py_instrument)
730                .map_err(|e| anyhow::anyhow!("Python on_instrument failed: {e}"))
731        })
732    }
733
734    fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
735        self.dispatch_on_quote(*quote)
736            .map_err(|e| anyhow::anyhow!("Python on_quote failed: {e}"))
737    }
738
739    fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
740        self.dispatch_on_trade(*tick)
741            .map_err(|e| anyhow::anyhow!("Python on_trade failed: {e}"))
742    }
743
744    fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
745        self.dispatch_on_bar(*bar)
746            .map_err(|e| anyhow::anyhow!("Python on_bar failed: {e}"))
747    }
748
749    fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
750        self.dispatch_on_book_deltas(deltas.clone())
751            .map_err(|e| anyhow::anyhow!("Python on_book_deltas failed: {e}"))
752    }
753
754    fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
755        self.dispatch_on_book(order_book)
756            .map_err(|e| anyhow::anyhow!("Python on_book failed: {e}"))
757    }
758
759    fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
760        self.dispatch_on_mark_price(*mark_price)
761            .map_err(|e| anyhow::anyhow!("Python on_mark_price failed: {e}"))
762    }
763
764    fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
765        self.dispatch_on_index_price(*index_price)
766            .map_err(|e| anyhow::anyhow!("Python on_index_price failed: {e}"))
767    }
768
769    fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
770        self.dispatch_on_funding_rate(*funding_rate)
771            .map_err(|e| anyhow::anyhow!("Python on_funding_rate failed: {e}"))
772    }
773
774    fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
775        self.dispatch_on_instrument_status(*data)
776            .map_err(|e| anyhow::anyhow!("Python on_instrument_status failed: {e}"))
777    }
778
779    fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
780        self.dispatch_on_instrument_close(*update)
781            .map_err(|e| anyhow::anyhow!("Python on_instrument_close failed: {e}"))
782    }
783
784    #[cfg(feature = "defi")]
785    fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
786        self.dispatch_on_block(block.clone())
787            .map_err(|e| anyhow::anyhow!("Python on_block failed: {e}"))
788    }
789
790    #[cfg(feature = "defi")]
791    fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
792        self.dispatch_on_pool(pool.clone())
793            .map_err(|e| anyhow::anyhow!("Python on_pool failed: {e}"))
794    }
795
796    #[cfg(feature = "defi")]
797    fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
798        self.dispatch_on_pool_swap(swap.clone())
799            .map_err(|e| anyhow::anyhow!("Python on_pool_swap failed: {e}"))
800    }
801
802    #[cfg(feature = "defi")]
803    fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
804        self.dispatch_on_pool_liquidity_update(update.clone())
805            .map_err(|e| anyhow::anyhow!("Python on_pool_liquidity_update failed: {e}"))
806    }
807
808    #[cfg(feature = "defi")]
809    fn on_pool_fee_collect(&mut self, collect: &PoolFeeCollect) -> anyhow::Result<()> {
810        self.dispatch_on_pool_fee_collect(collect.clone())
811            .map_err(|e| anyhow::anyhow!("Python on_pool_fee_collect failed: {e}"))
812    }
813
814    #[cfg(feature = "defi")]
815    fn on_pool_flash(&mut self, flash: &PoolFlash) -> anyhow::Result<()> {
816        self.dispatch_on_pool_flash(flash.clone())
817            .map_err(|e| anyhow::anyhow!("Python on_pool_flash failed: {e}"))
818    }
819
820    fn on_historical_data(&mut self, _data: &dyn Any) -> anyhow::Result<()> {
821        Python::attach(|py| {
822            let py_data = py.None();
823            self.dispatch_on_historical_data(py_data)
824                .map_err(|e| anyhow::anyhow!("Python on_historical_data failed: {e}"))
825        })
826    }
827
828    fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
829        self.dispatch_on_historical_quotes(quotes.to_vec())
830            .map_err(|e| anyhow::anyhow!("Python on_historical_quotes failed: {e}"))
831    }
832
833    fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
834        self.dispatch_on_historical_trades(trades.to_vec())
835            .map_err(|e| anyhow::anyhow!("Python on_historical_trades failed: {e}"))
836    }
837
838    fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
839        self.dispatch_on_historical_bars(bars.to_vec())
840            .map_err(|e| anyhow::anyhow!("Python on_historical_bars failed: {e}"))
841    }
842
843    fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
844        self.dispatch_on_historical_mark_prices(mark_prices.to_vec())
845            .map_err(|e| anyhow::anyhow!("Python on_historical_mark_prices failed: {e}"))
846    }
847
848    fn on_historical_index_prices(
849        &mut self,
850        index_prices: &[IndexPriceUpdate],
851    ) -> anyhow::Result<()> {
852        self.dispatch_on_historical_index_prices(index_prices.to_vec())
853            .map_err(|e| anyhow::anyhow!("Python on_historical_index_prices failed: {e}"))
854    }
855}
856
857#[pymethods]
858impl PyDataActor {
859    #[new]
860    #[pyo3(signature = (config=None))]
861    fn py_new(config: Option<DataActorConfig>) -> PyResult<Self> {
862        Ok(Self::new(config))
863    }
864
865    #[getter]
866    #[pyo3(name = "clock")]
867    fn py_clock(&self) -> PyResult<PyClock> {
868        let inner = self.inner();
869        if inner.core.is_registered() {
870            Ok(inner.clock.clone())
871        } else {
872            Err(to_pyruntime_err(
873                "Actor must be registered with a trader before accessing clock",
874            ))
875        }
876    }
877
878    #[getter]
879    #[pyo3(name = "cache")]
880    fn py_cache(&self) -> PyResult<PyCache> {
881        let inner = self.inner();
882        if inner.core.is_registered() {
883            Ok(PyCache::from_rc(inner.core.cache_rc()))
884        } else {
885            Err(to_pyruntime_err(
886                "Actor must be registered with a trader before accessing cache",
887            ))
888        }
889    }
890
891    #[getter]
892    #[pyo3(name = "log")]
893    fn py_log(&self) -> PyLogger {
894        self.inner().logger.clone()
895    }
896
897    #[getter]
898    #[pyo3(name = "actor_id")]
899    fn py_actor_id(&self) -> ActorId {
900        self.inner().core.actor_id
901    }
902
903    #[getter]
904    #[pyo3(name = "trader_id")]
905    fn py_trader_id(&self) -> Option<TraderId> {
906        self.inner().core.trader_id()
907    }
908
909    #[pyo3(name = "state")]
910    fn py_state(&self) -> ComponentState {
911        Component::state(self.inner())
912    }
913
914    #[pyo3(name = "is_ready")]
915    fn py_is_ready(&self) -> bool {
916        Component::is_ready(self.inner())
917    }
918
919    #[pyo3(name = "is_running")]
920    fn py_is_running(&self) -> bool {
921        Component::is_running(self.inner())
922    }
923
924    #[pyo3(name = "is_stopped")]
925    fn py_is_stopped(&self) -> bool {
926        Component::is_stopped(self.inner())
927    }
928
929    #[pyo3(name = "is_degraded")]
930    fn py_is_degraded(&self) -> bool {
931        Component::is_degraded(self.inner())
932    }
933
934    #[pyo3(name = "is_faulted")]
935    fn py_is_faulted(&self) -> bool {
936        Component::is_faulted(self.inner())
937    }
938
939    #[pyo3(name = "is_disposed")]
940    fn py_is_disposed(&self) -> bool {
941        Component::is_disposed(self.inner())
942    }
943
944    #[pyo3(name = "start")]
945    fn py_start(&mut self) -> PyResult<()> {
946        Component::start(self.inner_mut()).map_err(to_pyruntime_err)
947    }
948
949    #[pyo3(name = "stop")]
950    fn py_stop(&mut self) -> PyResult<()> {
951        Component::stop(self.inner_mut()).map_err(to_pyruntime_err)
952    }
953
954    #[pyo3(name = "resume")]
955    fn py_resume(&mut self) -> PyResult<()> {
956        Component::resume(self.inner_mut()).map_err(to_pyruntime_err)
957    }
958
959    #[pyo3(name = "reset")]
960    fn py_reset(&mut self) -> PyResult<()> {
961        Component::reset(self.inner_mut()).map_err(to_pyruntime_err)
962    }
963
964    #[pyo3(name = "dispose")]
965    fn py_dispose(&mut self) -> PyResult<()> {
966        Component::dispose(self.inner_mut()).map_err(to_pyruntime_err)
967    }
968
969    #[pyo3(name = "degrade")]
970    fn py_degrade(&mut self) -> PyResult<()> {
971        Component::degrade(self.inner_mut()).map_err(to_pyruntime_err)
972    }
973
974    #[pyo3(name = "fault")]
975    fn py_fault(&mut self) -> PyResult<()> {
976        Component::fault(self.inner_mut()).map_err(to_pyruntime_err)
977    }
978
979    #[pyo3(name = "shutdown_system")]
980    #[pyo3(signature = (reason=None))]
981    fn py_shutdown_system(&self, reason: Option<String>) -> PyResult<()> {
982        self.inner().core.shutdown_system(reason);
983        Ok(())
984    }
985
986    #[pyo3(name = "on_start")]
987    fn py_on_start(&self) -> PyResult<()> {
988        self.inner().dispatch_on_start()
989    }
990
991    #[pyo3(name = "on_stop")]
992    fn py_on_stop(&mut self) -> PyResult<()> {
993        self.inner_mut().dispatch_on_stop()
994    }
995
996    #[pyo3(name = "on_resume")]
997    fn py_on_resume(&mut self) -> PyResult<()> {
998        self.inner_mut().dispatch_on_resume()
999    }
1000
1001    #[pyo3(name = "on_reset")]
1002    fn py_on_reset(&mut self) -> PyResult<()> {
1003        self.inner_mut().dispatch_on_reset()
1004    }
1005
1006    #[pyo3(name = "on_dispose")]
1007    fn py_on_dispose(&mut self) -> PyResult<()> {
1008        self.inner_mut().dispatch_on_dispose()
1009    }
1010
1011    #[pyo3(name = "on_degrade")]
1012    fn py_on_degrade(&mut self) -> PyResult<()> {
1013        self.inner_mut().dispatch_on_degrade()
1014    }
1015
1016    #[pyo3(name = "on_fault")]
1017    fn py_on_fault(&mut self) -> PyResult<()> {
1018        self.inner_mut().dispatch_on_fault()
1019    }
1020
1021    #[pyo3(name = "on_time_event")]
1022    fn py_on_time_event(&mut self, event: TimeEvent) -> PyResult<()> {
1023        self.inner_mut().dispatch_on_time_event(event)
1024    }
1025
1026    #[pyo3(name = "on_data")]
1027    fn py_on_data(&mut self, data: Py<PyAny>) -> PyResult<()> {
1028        self.inner_mut().dispatch_on_data(data)
1029    }
1030
1031    #[pyo3(name = "on_signal")]
1032    fn py_on_signal(&mut self, signal: &Signal) -> PyResult<()> {
1033        self.inner_mut().dispatch_on_signal(signal)
1034    }
1035
1036    #[pyo3(name = "on_instrument")]
1037    fn py_on_instrument(&mut self, instrument: Py<PyAny>) -> PyResult<()> {
1038        self.inner_mut().dispatch_on_instrument(instrument)
1039    }
1040
1041    #[pyo3(name = "on_quote")]
1042    fn py_on_quote(&mut self, quote: QuoteTick) -> PyResult<()> {
1043        self.inner_mut().dispatch_on_quote(quote)
1044    }
1045
1046    #[pyo3(name = "on_trade")]
1047    fn py_on_trade(&mut self, trade: TradeTick) -> PyResult<()> {
1048        self.inner_mut().dispatch_on_trade(trade)
1049    }
1050
1051    #[pyo3(name = "on_bar")]
1052    fn py_on_bar(&mut self, bar: Bar) -> PyResult<()> {
1053        self.inner_mut().dispatch_on_bar(bar)
1054    }
1055
1056    #[pyo3(name = "on_book_deltas")]
1057    fn py_on_book_deltas(&mut self, deltas: OrderBookDeltas) -> PyResult<()> {
1058        self.inner_mut().dispatch_on_book_deltas(deltas)
1059    }
1060
1061    #[pyo3(name = "on_book")]
1062    fn py_on_book(&mut self, book: &OrderBook) -> PyResult<()> {
1063        self.inner_mut().dispatch_on_book(book)
1064    }
1065
1066    #[pyo3(name = "on_mark_price")]
1067    fn py_on_mark_price(&mut self, mark_price: MarkPriceUpdate) -> PyResult<()> {
1068        self.inner_mut().dispatch_on_mark_price(mark_price)
1069    }
1070
1071    #[pyo3(name = "on_index_price")]
1072    fn py_on_index_price(&mut self, index_price: IndexPriceUpdate) -> PyResult<()> {
1073        self.inner_mut().dispatch_on_index_price(index_price)
1074    }
1075
1076    #[pyo3(name = "on_funding_rate")]
1077    fn py_on_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> PyResult<()> {
1078        self.inner_mut().dispatch_on_funding_rate(funding_rate)
1079    }
1080
1081    #[pyo3(name = "on_instrument_status")]
1082    fn py_on_instrument_status(&mut self, status: InstrumentStatus) -> PyResult<()> {
1083        self.inner_mut().dispatch_on_instrument_status(status)
1084    }
1085
1086    #[pyo3(name = "on_instrument_close")]
1087    fn py_on_instrument_close(&mut self, close: InstrumentClose) -> PyResult<()> {
1088        self.inner_mut().dispatch_on_instrument_close(close)
1089    }
1090
1091    #[cfg(feature = "defi")]
1092    #[pyo3(name = "on_block")]
1093    fn py_on_block(&mut self, block: Block) -> PyResult<()> {
1094        self.inner_mut().dispatch_on_block(block)
1095    }
1096
1097    #[cfg(feature = "defi")]
1098    #[pyo3(name = "on_pool")]
1099    fn py_on_pool(&mut self, pool: Pool) -> PyResult<()> {
1100        self.inner_mut().dispatch_on_pool(pool)
1101    }
1102
1103    #[cfg(feature = "defi")]
1104    #[pyo3(name = "on_pool_swap")]
1105    fn py_on_pool_swap(&mut self, swap: PoolSwap) -> PyResult<()> {
1106        self.inner_mut().dispatch_on_pool_swap(swap)
1107    }
1108
1109    #[cfg(feature = "defi")]
1110    #[pyo3(name = "on_pool_liquidity_update")]
1111    fn py_on_pool_liquidity_update(&mut self, update: PoolLiquidityUpdate) -> PyResult<()> {
1112        self.inner_mut().dispatch_on_pool_liquidity_update(update)
1113    }
1114
1115    #[cfg(feature = "defi")]
1116    #[pyo3(name = "on_pool_fee_collect")]
1117    fn py_on_pool_fee_collect(&mut self, update: PoolFeeCollect) -> PyResult<()> {
1118        self.inner_mut().dispatch_on_pool_fee_collect(update)
1119    }
1120
1121    #[cfg(feature = "defi")]
1122    #[pyo3(name = "on_pool_flash")]
1123    fn py_on_pool_flash(&mut self, flash: PoolFlash) -> PyResult<()> {
1124        self.inner_mut().dispatch_on_pool_flash(flash)
1125    }
1126
1127    #[pyo3(name = "subscribe_data")]
1128    #[pyo3(signature = (data_type, client_id=None, params=None))]
1129    fn py_subscribe_data(
1130        &mut self,
1131        py: Python<'_>,
1132        data_type: DataType,
1133        client_id: Option<ClientId>,
1134        params: Option<Py<PyDict>>,
1135    ) -> PyResult<()> {
1136        let params = dict_to_params(py, params)?;
1137        DataActor::subscribe_data(self.inner_mut(), data_type, client_id, params);
1138        Ok(())
1139    }
1140
1141    #[pyo3(name = "subscribe_instruments")]
1142    #[pyo3(signature = (venue, client_id=None, params=None))]
1143    fn py_subscribe_instruments(
1144        &mut self,
1145        py: Python<'_>,
1146        venue: Venue,
1147        client_id: Option<ClientId>,
1148        params: Option<Py<PyDict>>,
1149    ) -> PyResult<()> {
1150        let params = dict_to_params(py, params)?;
1151        DataActor::subscribe_instruments(self.inner_mut(), venue, client_id, params);
1152        Ok(())
1153    }
1154
1155    #[pyo3(name = "subscribe_instrument")]
1156    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1157    fn py_subscribe_instrument(
1158        &mut self,
1159        py: Python<'_>,
1160        instrument_id: InstrumentId,
1161        client_id: Option<ClientId>,
1162        params: Option<Py<PyDict>>,
1163    ) -> PyResult<()> {
1164        let params = dict_to_params(py, params)?;
1165        DataActor::subscribe_instrument(self.inner_mut(), instrument_id, client_id, params);
1166        Ok(())
1167    }
1168
1169    #[pyo3(name = "subscribe_book_deltas")]
1170    #[pyo3(signature = (instrument_id, book_type, depth=None, client_id=None, managed=false, params=None))]
1171    #[allow(clippy::too_many_arguments)]
1172    fn py_subscribe_book_deltas(
1173        &mut self,
1174        py: Python<'_>,
1175        instrument_id: InstrumentId,
1176        book_type: BookType,
1177        depth: Option<usize>,
1178        client_id: Option<ClientId>,
1179        managed: bool,
1180        params: Option<Py<PyDict>>,
1181    ) -> PyResult<()> {
1182        let params = dict_to_params(py, params)?;
1183        let depth = depth.and_then(NonZeroUsize::new);
1184        DataActor::subscribe_book_deltas(
1185            self.inner_mut(),
1186            instrument_id,
1187            book_type,
1188            depth,
1189            client_id,
1190            managed,
1191            params,
1192        );
1193        Ok(())
1194    }
1195
1196    #[pyo3(name = "subscribe_book_at_interval")]
1197    #[pyo3(signature = (instrument_id, book_type, interval_ms, depth=None, client_id=None, params=None))]
1198    #[allow(clippy::too_many_arguments)]
1199    fn py_subscribe_book_at_interval(
1200        &mut self,
1201        py: Python<'_>,
1202        instrument_id: InstrumentId,
1203        book_type: BookType,
1204        interval_ms: usize,
1205        depth: Option<usize>,
1206        client_id: Option<ClientId>,
1207        params: Option<Py<PyDict>>,
1208    ) -> PyResult<()> {
1209        let params = dict_to_params(py, params)?;
1210        let depth = depth.and_then(NonZeroUsize::new);
1211        let interval_ms = NonZeroUsize::new(interval_ms)
1212            .ok_or_else(|| to_pyvalue_err("interval_ms must be > 0"))?;
1213
1214        DataActor::subscribe_book_at_interval(
1215            self.inner_mut(),
1216            instrument_id,
1217            book_type,
1218            depth,
1219            interval_ms,
1220            client_id,
1221            params,
1222        );
1223        Ok(())
1224    }
1225
1226    #[pyo3(name = "subscribe_quotes")]
1227    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1228    fn py_subscribe_quotes(
1229        &mut self,
1230        py: Python<'_>,
1231        instrument_id: InstrumentId,
1232        client_id: Option<ClientId>,
1233        params: Option<Py<PyDict>>,
1234    ) -> PyResult<()> {
1235        let params = dict_to_params(py, params)?;
1236        DataActor::subscribe_quotes(self.inner_mut(), instrument_id, client_id, params);
1237        Ok(())
1238    }
1239
1240    #[pyo3(name = "subscribe_trades")]
1241    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1242    fn py_subscribe_trades(
1243        &mut self,
1244        py: Python<'_>,
1245        instrument_id: InstrumentId,
1246        client_id: Option<ClientId>,
1247        params: Option<Py<PyDict>>,
1248    ) -> PyResult<()> {
1249        let params = dict_to_params(py, params)?;
1250        DataActor::subscribe_trades(self.inner_mut(), instrument_id, client_id, params);
1251        Ok(())
1252    }
1253
1254    #[pyo3(name = "subscribe_bars")]
1255    #[pyo3(signature = (bar_type, client_id=None, params=None))]
1256    fn py_subscribe_bars(
1257        &mut self,
1258        py: Python<'_>,
1259        bar_type: BarType,
1260        client_id: Option<ClientId>,
1261        params: Option<Py<PyDict>>,
1262    ) -> PyResult<()> {
1263        let params = dict_to_params(py, params)?;
1264        DataActor::subscribe_bars(self.inner_mut(), bar_type, client_id, params);
1265        Ok(())
1266    }
1267
1268    #[pyo3(name = "subscribe_mark_prices")]
1269    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1270    fn py_subscribe_mark_prices(
1271        &mut self,
1272        py: Python<'_>,
1273        instrument_id: InstrumentId,
1274        client_id: Option<ClientId>,
1275        params: Option<Py<PyDict>>,
1276    ) -> PyResult<()> {
1277        let params = dict_to_params(py, params)?;
1278        DataActor::subscribe_mark_prices(self.inner_mut(), instrument_id, client_id, params);
1279        Ok(())
1280    }
1281
1282    #[pyo3(name = "subscribe_index_prices")]
1283    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1284    fn py_subscribe_index_prices(
1285        &mut self,
1286        py: Python<'_>,
1287        instrument_id: InstrumentId,
1288        client_id: Option<ClientId>,
1289        params: Option<Py<PyDict>>,
1290    ) -> PyResult<()> {
1291        let params = dict_to_params(py, params)?;
1292        DataActor::subscribe_index_prices(self.inner_mut(), instrument_id, client_id, params);
1293        Ok(())
1294    }
1295
1296    #[pyo3(name = "subscribe_funding_rates")]
1297    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1298    fn py_subscribe_funding_rates(
1299        &mut self,
1300        py: Python<'_>,
1301        instrument_id: InstrumentId,
1302        client_id: Option<ClientId>,
1303        params: Option<Py<PyDict>>,
1304    ) -> PyResult<()> {
1305        let params = dict_to_params(py, params)?;
1306        DataActor::subscribe_funding_rates(self.inner_mut(), instrument_id, client_id, params);
1307        Ok(())
1308    }
1309
1310    #[pyo3(name = "subscribe_instrument_status")]
1311    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1312    fn py_subscribe_instrument_status(
1313        &mut self,
1314        py: Python<'_>,
1315        instrument_id: InstrumentId,
1316        client_id: Option<ClientId>,
1317        params: Option<Py<PyDict>>,
1318    ) -> PyResult<()> {
1319        let params = dict_to_params(py, params)?;
1320        DataActor::subscribe_instrument_status(self.inner_mut(), instrument_id, client_id, params);
1321        Ok(())
1322    }
1323
1324    #[pyo3(name = "subscribe_instrument_close")]
1325    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1326    fn py_subscribe_instrument_close(
1327        &mut self,
1328        py: Python<'_>,
1329        instrument_id: InstrumentId,
1330        client_id: Option<ClientId>,
1331        params: Option<Py<PyDict>>,
1332    ) -> PyResult<()> {
1333        let params = dict_to_params(py, params)?;
1334        DataActor::subscribe_instrument_close(self.inner_mut(), instrument_id, client_id, params);
1335        Ok(())
1336    }
1337
1338    #[pyo3(name = "subscribe_order_fills")]
1339    #[pyo3(signature = (instrument_id))]
1340    fn py_subscribe_order_fills(&mut self, instrument_id: InstrumentId) -> PyResult<()> {
1341        DataActor::subscribe_order_fills(self.inner_mut(), instrument_id);
1342        Ok(())
1343    }
1344
1345    #[pyo3(name = "subscribe_order_cancels")]
1346    #[pyo3(signature = (instrument_id))]
1347    fn py_subscribe_order_cancels(&mut self, instrument_id: InstrumentId) -> PyResult<()> {
1348        DataActor::subscribe_order_cancels(self.inner_mut(), instrument_id);
1349        Ok(())
1350    }
1351
1352    #[cfg(feature = "defi")]
1353    #[pyo3(name = "subscribe_blocks")]
1354    #[pyo3(signature = (chain, client_id=None, params=None))]
1355    fn py_subscribe_blocks(
1356        &mut self,
1357        py: Python<'_>,
1358        chain: Blockchain,
1359        client_id: Option<ClientId>,
1360        params: Option<Py<PyDict>>,
1361    ) -> PyResult<()> {
1362        let params = dict_to_params(py, params)?;
1363        DataActor::subscribe_blocks(self.inner_mut(), chain, client_id, params);
1364        Ok(())
1365    }
1366
1367    #[cfg(feature = "defi")]
1368    #[pyo3(name = "subscribe_pool")]
1369    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1370    fn py_subscribe_pool(
1371        &mut self,
1372        py: Python<'_>,
1373        instrument_id: InstrumentId,
1374        client_id: Option<ClientId>,
1375        params: Option<Py<PyDict>>,
1376    ) -> PyResult<()> {
1377        let params = dict_to_params(py, params)?;
1378        DataActor::subscribe_pool(self.inner_mut(), instrument_id, client_id, params);
1379        Ok(())
1380    }
1381
1382    #[cfg(feature = "defi")]
1383    #[pyo3(name = "subscribe_pool_swaps")]
1384    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1385    fn py_subscribe_pool_swaps(
1386        &mut self,
1387        py: Python<'_>,
1388        instrument_id: InstrumentId,
1389        client_id: Option<ClientId>,
1390        params: Option<Py<PyDict>>,
1391    ) -> PyResult<()> {
1392        let params = dict_to_params(py, params)?;
1393        DataActor::subscribe_pool_swaps(self.inner_mut(), instrument_id, client_id, params);
1394        Ok(())
1395    }
1396
1397    #[cfg(feature = "defi")]
1398    #[pyo3(name = "subscribe_pool_liquidity_updates")]
1399    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1400    fn py_subscribe_pool_liquidity_updates(
1401        &mut self,
1402        py: Python<'_>,
1403        instrument_id: InstrumentId,
1404        client_id: Option<ClientId>,
1405        params: Option<Py<PyDict>>,
1406    ) -> PyResult<()> {
1407        let params = dict_to_params(py, params)?;
1408        DataActor::subscribe_pool_liquidity_updates(
1409            self.inner_mut(),
1410            instrument_id,
1411            client_id,
1412            params,
1413        );
1414        Ok(())
1415    }
1416
1417    #[cfg(feature = "defi")]
1418    #[pyo3(name = "subscribe_pool_fee_collects")]
1419    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1420    fn py_subscribe_pool_fee_collects(
1421        &mut self,
1422        py: Python<'_>,
1423        instrument_id: InstrumentId,
1424        client_id: Option<ClientId>,
1425        params: Option<Py<PyDict>>,
1426    ) -> PyResult<()> {
1427        let params = dict_to_params(py, params)?;
1428        DataActor::subscribe_pool_fee_collects(self.inner_mut(), instrument_id, client_id, params);
1429        Ok(())
1430    }
1431
1432    #[cfg(feature = "defi")]
1433    #[pyo3(name = "subscribe_pool_flash_events")]
1434    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1435    fn py_subscribe_pool_flash_events(
1436        &mut self,
1437        py: Python<'_>,
1438        instrument_id: InstrumentId,
1439        client_id: Option<ClientId>,
1440        params: Option<Py<PyDict>>,
1441    ) -> PyResult<()> {
1442        let params = dict_to_params(py, params)?;
1443        DataActor::subscribe_pool_flash_events(self.inner_mut(), instrument_id, client_id, params);
1444        Ok(())
1445    }
1446
1447    #[pyo3(name = "request_data")]
1448    #[pyo3(signature = (data_type, client_id, start=None, end=None, limit=None, params=None))]
1449    #[allow(clippy::too_many_arguments)]
1450    fn py_request_data(
1451        &mut self,
1452        py: Python<'_>,
1453        data_type: DataType,
1454        client_id: ClientId,
1455        start: Option<u64>,
1456        end: Option<u64>,
1457        limit: Option<usize>,
1458        params: Option<Py<PyDict>>,
1459    ) -> PyResult<String> {
1460        let params = dict_to_params(py, params)?;
1461        let limit = limit.and_then(NonZeroUsize::new);
1462        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1463        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1464
1465        let request_id = DataActor::request_data(
1466            self.inner_mut(),
1467            data_type,
1468            client_id,
1469            start,
1470            end,
1471            limit,
1472            params,
1473        )
1474        .map_err(to_pyvalue_err)?;
1475        Ok(request_id.to_string())
1476    }
1477
1478    #[pyo3(name = "request_instrument")]
1479    #[pyo3(signature = (instrument_id, start=None, end=None, client_id=None, params=None))]
1480    fn py_request_instrument(
1481        &mut self,
1482        py: Python<'_>,
1483        instrument_id: InstrumentId,
1484        start: Option<u64>,
1485        end: Option<u64>,
1486        client_id: Option<ClientId>,
1487        params: Option<Py<PyDict>>,
1488    ) -> PyResult<String> {
1489        let params = dict_to_params(py, params)?;
1490        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1491        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1492
1493        let request_id = DataActor::request_instrument(
1494            self.inner_mut(),
1495            instrument_id,
1496            start,
1497            end,
1498            client_id,
1499            params,
1500        )
1501        .map_err(to_pyvalue_err)?;
1502        Ok(request_id.to_string())
1503    }
1504
1505    #[pyo3(name = "request_instruments")]
1506    #[pyo3(signature = (venue=None, start=None, end=None, client_id=None, params=None))]
1507    fn py_request_instruments(
1508        &mut self,
1509        py: Python<'_>,
1510        venue: Option<Venue>,
1511        start: Option<u64>,
1512        end: Option<u64>,
1513        client_id: Option<ClientId>,
1514        params: Option<Py<PyDict>>,
1515    ) -> PyResult<String> {
1516        let params = dict_to_params(py, params)?;
1517        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1518        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1519
1520        let request_id =
1521            DataActor::request_instruments(self.inner_mut(), venue, start, end, client_id, params)
1522                .map_err(to_pyvalue_err)?;
1523        Ok(request_id.to_string())
1524    }
1525
1526    #[pyo3(name = "request_book_snapshot")]
1527    #[pyo3(signature = (instrument_id, depth=None, client_id=None, params=None))]
1528    fn py_request_book_snapshot(
1529        &mut self,
1530        py: Python<'_>,
1531        instrument_id: InstrumentId,
1532        depth: Option<usize>,
1533        client_id: Option<ClientId>,
1534        params: Option<Py<PyDict>>,
1535    ) -> PyResult<String> {
1536        let params = dict_to_params(py, params)?;
1537        let depth = depth.and_then(NonZeroUsize::new);
1538
1539        let request_id = DataActor::request_book_snapshot(
1540            self.inner_mut(),
1541            instrument_id,
1542            depth,
1543            client_id,
1544            params,
1545        )
1546        .map_err(to_pyvalue_err)?;
1547        Ok(request_id.to_string())
1548    }
1549
1550    #[pyo3(name = "request_quotes")]
1551    #[pyo3(signature = (instrument_id, start=None, end=None, limit=None, client_id=None, params=None))]
1552    #[allow(clippy::too_many_arguments)]
1553    fn py_request_quotes(
1554        &mut self,
1555        py: Python<'_>,
1556        instrument_id: InstrumentId,
1557        start: Option<u64>,
1558        end: Option<u64>,
1559        limit: Option<usize>,
1560        client_id: Option<ClientId>,
1561        params: Option<Py<PyDict>>,
1562    ) -> PyResult<String> {
1563        let params = dict_to_params(py, params)?;
1564        let limit = limit.and_then(NonZeroUsize::new);
1565        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1566        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1567
1568        let request_id = DataActor::request_quotes(
1569            self.inner_mut(),
1570            instrument_id,
1571            start,
1572            end,
1573            limit,
1574            client_id,
1575            params,
1576        )
1577        .map_err(to_pyvalue_err)?;
1578        Ok(request_id.to_string())
1579    }
1580
1581    #[pyo3(name = "request_trades")]
1582    #[pyo3(signature = (instrument_id, start=None, end=None, limit=None, client_id=None, params=None))]
1583    #[allow(clippy::too_many_arguments)]
1584    fn py_request_trades(
1585        &mut self,
1586        py: Python<'_>,
1587        instrument_id: InstrumentId,
1588        start: Option<u64>,
1589        end: Option<u64>,
1590        limit: Option<usize>,
1591        client_id: Option<ClientId>,
1592        params: Option<Py<PyDict>>,
1593    ) -> PyResult<String> {
1594        let params = dict_to_params(py, params)?;
1595        let limit = limit.and_then(NonZeroUsize::new);
1596        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1597        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1598
1599        let request_id = DataActor::request_trades(
1600            self.inner_mut(),
1601            instrument_id,
1602            start,
1603            end,
1604            limit,
1605            client_id,
1606            params,
1607        )
1608        .map_err(to_pyvalue_err)?;
1609        Ok(request_id.to_string())
1610    }
1611
1612    #[pyo3(name = "request_bars")]
1613    #[pyo3(signature = (bar_type, start=None, end=None, limit=None, client_id=None, params=None))]
1614    #[allow(clippy::too_many_arguments)]
1615    fn py_request_bars(
1616        &mut self,
1617        py: Python<'_>,
1618        bar_type: BarType,
1619        start: Option<u64>,
1620        end: Option<u64>,
1621        limit: Option<usize>,
1622        client_id: Option<ClientId>,
1623        params: Option<Py<PyDict>>,
1624    ) -> PyResult<String> {
1625        let params = dict_to_params(py, params)?;
1626        let limit = limit.and_then(NonZeroUsize::new);
1627        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1628        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1629
1630        let request_id = DataActor::request_bars(
1631            self.inner_mut(),
1632            bar_type,
1633            start,
1634            end,
1635            limit,
1636            client_id,
1637            params,
1638        )
1639        .map_err(to_pyvalue_err)?;
1640        Ok(request_id.to_string())
1641    }
1642
1643    #[pyo3(name = "unsubscribe_data")]
1644    #[pyo3(signature = (data_type, client_id=None, params=None))]
1645    fn py_unsubscribe_data(
1646        &mut self,
1647        py: Python<'_>,
1648        data_type: DataType,
1649        client_id: Option<ClientId>,
1650        params: Option<Py<PyDict>>,
1651    ) -> PyResult<()> {
1652        let params = dict_to_params(py, params)?;
1653        DataActor::unsubscribe_data(self.inner_mut(), data_type, client_id, params);
1654        Ok(())
1655    }
1656
1657    #[pyo3(name = "unsubscribe_instruments")]
1658    #[pyo3(signature = (venue, client_id=None, params=None))]
1659    fn py_unsubscribe_instruments(
1660        &mut self,
1661        py: Python<'_>,
1662        venue: Venue,
1663        client_id: Option<ClientId>,
1664        params: Option<Py<PyDict>>,
1665    ) -> PyResult<()> {
1666        let params = dict_to_params(py, params)?;
1667        DataActor::unsubscribe_instruments(self.inner_mut(), venue, client_id, params);
1668        Ok(())
1669    }
1670
1671    #[pyo3(name = "unsubscribe_instrument")]
1672    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1673    fn py_unsubscribe_instrument(
1674        &mut self,
1675        py: Python<'_>,
1676        instrument_id: InstrumentId,
1677        client_id: Option<ClientId>,
1678        params: Option<Py<PyDict>>,
1679    ) -> PyResult<()> {
1680        let params = dict_to_params(py, params)?;
1681        DataActor::unsubscribe_instrument(self.inner_mut(), instrument_id, client_id, params);
1682        Ok(())
1683    }
1684
1685    #[pyo3(name = "unsubscribe_book_deltas")]
1686    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1687    fn py_unsubscribe_book_deltas(
1688        &mut self,
1689        py: Python<'_>,
1690        instrument_id: InstrumentId,
1691        client_id: Option<ClientId>,
1692        params: Option<Py<PyDict>>,
1693    ) -> PyResult<()> {
1694        let params = dict_to_params(py, params)?;
1695        DataActor::unsubscribe_book_deltas(self.inner_mut(), instrument_id, client_id, params);
1696        Ok(())
1697    }
1698
1699    #[pyo3(name = "unsubscribe_book_at_interval")]
1700    #[pyo3(signature = (instrument_id, interval_ms, client_id=None, params=None))]
1701    fn py_unsubscribe_book_at_interval(
1702        &mut self,
1703        py: Python<'_>,
1704        instrument_id: InstrumentId,
1705        interval_ms: usize,
1706        client_id: Option<ClientId>,
1707        params: Option<Py<PyDict>>,
1708    ) -> PyResult<()> {
1709        let params = dict_to_params(py, params)?;
1710        let interval_ms = NonZeroUsize::new(interval_ms)
1711            .ok_or_else(|| to_pyvalue_err("interval_ms must be > 0"))?;
1712
1713        DataActor::unsubscribe_book_at_interval(
1714            self.inner_mut(),
1715            instrument_id,
1716            interval_ms,
1717            client_id,
1718            params,
1719        );
1720        Ok(())
1721    }
1722
1723    #[pyo3(name = "unsubscribe_quotes")]
1724    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1725    fn py_unsubscribe_quotes(
1726        &mut self,
1727        py: Python<'_>,
1728        instrument_id: InstrumentId,
1729        client_id: Option<ClientId>,
1730        params: Option<Py<PyDict>>,
1731    ) -> PyResult<()> {
1732        let params = dict_to_params(py, params)?;
1733        DataActor::unsubscribe_quotes(self.inner_mut(), instrument_id, client_id, params);
1734        Ok(())
1735    }
1736
1737    #[pyo3(name = "unsubscribe_trades")]
1738    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1739    fn py_unsubscribe_trades(
1740        &mut self,
1741        py: Python<'_>,
1742        instrument_id: InstrumentId,
1743        client_id: Option<ClientId>,
1744        params: Option<Py<PyDict>>,
1745    ) -> PyResult<()> {
1746        let params = dict_to_params(py, params)?;
1747        DataActor::unsubscribe_trades(self.inner_mut(), instrument_id, client_id, params);
1748        Ok(())
1749    }
1750
1751    #[pyo3(name = "unsubscribe_bars")]
1752    #[pyo3(signature = (bar_type, client_id=None, params=None))]
1753    fn py_unsubscribe_bars(
1754        &mut self,
1755        py: Python<'_>,
1756        bar_type: BarType,
1757        client_id: Option<ClientId>,
1758        params: Option<Py<PyDict>>,
1759    ) -> PyResult<()> {
1760        let params = dict_to_params(py, params)?;
1761        DataActor::unsubscribe_bars(self.inner_mut(), bar_type, client_id, params);
1762        Ok(())
1763    }
1764
1765    #[pyo3(name = "unsubscribe_mark_prices")]
1766    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1767    fn py_unsubscribe_mark_prices(
1768        &mut self,
1769        py: Python<'_>,
1770        instrument_id: InstrumentId,
1771        client_id: Option<ClientId>,
1772        params: Option<Py<PyDict>>,
1773    ) -> PyResult<()> {
1774        let params = dict_to_params(py, params)?;
1775        DataActor::unsubscribe_mark_prices(self.inner_mut(), instrument_id, client_id, params);
1776        Ok(())
1777    }
1778
1779    #[pyo3(name = "unsubscribe_index_prices")]
1780    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1781    fn py_unsubscribe_index_prices(
1782        &mut self,
1783        py: Python<'_>,
1784        instrument_id: InstrumentId,
1785        client_id: Option<ClientId>,
1786        params: Option<Py<PyDict>>,
1787    ) -> PyResult<()> {
1788        let params = dict_to_params(py, params)?;
1789        DataActor::unsubscribe_index_prices(self.inner_mut(), instrument_id, client_id, params);
1790        Ok(())
1791    }
1792
1793    #[pyo3(name = "unsubscribe_instrument_status")]
1794    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1795    fn py_unsubscribe_instrument_status(
1796        &mut self,
1797        py: Python<'_>,
1798        instrument_id: InstrumentId,
1799        client_id: Option<ClientId>,
1800        params: Option<Py<PyDict>>,
1801    ) -> PyResult<()> {
1802        let params = dict_to_params(py, params)?;
1803        DataActor::unsubscribe_instrument_status(
1804            self.inner_mut(),
1805            instrument_id,
1806            client_id,
1807            params,
1808        );
1809        Ok(())
1810    }
1811
1812    #[pyo3(name = "unsubscribe_instrument_close")]
1813    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1814    fn py_unsubscribe_instrument_close(
1815        &mut self,
1816        py: Python<'_>,
1817        instrument_id: InstrumentId,
1818        client_id: Option<ClientId>,
1819        params: Option<Py<PyDict>>,
1820    ) -> PyResult<()> {
1821        let params = dict_to_params(py, params)?;
1822        DataActor::unsubscribe_instrument_close(self.inner_mut(), instrument_id, client_id, params);
1823        Ok(())
1824    }
1825
1826    #[pyo3(name = "unsubscribe_order_fills")]
1827    #[pyo3(signature = (instrument_id))]
1828    fn py_unsubscribe_order_fills(&mut self, instrument_id: InstrumentId) -> PyResult<()> {
1829        DataActor::unsubscribe_order_fills(self.inner_mut(), instrument_id);
1830        Ok(())
1831    }
1832
1833    #[pyo3(name = "unsubscribe_order_cancels")]
1834    #[pyo3(signature = (instrument_id))]
1835    fn py_unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId) -> PyResult<()> {
1836        DataActor::unsubscribe_order_cancels(self.inner_mut(), instrument_id);
1837        Ok(())
1838    }
1839
1840    #[cfg(feature = "defi")]
1841    #[pyo3(name = "unsubscribe_blocks")]
1842    #[pyo3(signature = (chain, client_id=None, params=None))]
1843    fn py_unsubscribe_blocks(
1844        &mut self,
1845        py: Python<'_>,
1846        chain: Blockchain,
1847        client_id: Option<ClientId>,
1848        params: Option<Py<PyDict>>,
1849    ) -> PyResult<()> {
1850        let params = dict_to_params(py, params)?;
1851        DataActor::unsubscribe_blocks(self.inner_mut(), chain, client_id, params);
1852        Ok(())
1853    }
1854
1855    #[cfg(feature = "defi")]
1856    #[pyo3(name = "unsubscribe_pool")]
1857    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1858    fn py_unsubscribe_pool(
1859        &mut self,
1860        py: Python<'_>,
1861        instrument_id: InstrumentId,
1862        client_id: Option<ClientId>,
1863        params: Option<Py<PyDict>>,
1864    ) -> PyResult<()> {
1865        let params = dict_to_params(py, params)?;
1866        DataActor::unsubscribe_pool(self.inner_mut(), instrument_id, client_id, params);
1867        Ok(())
1868    }
1869
1870    #[cfg(feature = "defi")]
1871    #[pyo3(name = "unsubscribe_pool_swaps")]
1872    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1873    fn py_unsubscribe_pool_swaps(
1874        &mut self,
1875        py: Python<'_>,
1876        instrument_id: InstrumentId,
1877        client_id: Option<ClientId>,
1878        params: Option<Py<PyDict>>,
1879    ) -> PyResult<()> {
1880        let params = dict_to_params(py, params)?;
1881        DataActor::unsubscribe_pool_swaps(self.inner_mut(), instrument_id, client_id, params);
1882        Ok(())
1883    }
1884
1885    #[cfg(feature = "defi")]
1886    #[pyo3(name = "unsubscribe_pool_liquidity_updates")]
1887    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1888    fn py_unsubscribe_pool_liquidity_updates(
1889        &mut self,
1890        py: Python<'_>,
1891        instrument_id: InstrumentId,
1892        client_id: Option<ClientId>,
1893        params: Option<Py<PyDict>>,
1894    ) -> PyResult<()> {
1895        let params = dict_to_params(py, params)?;
1896        DataActor::unsubscribe_pool_liquidity_updates(
1897            self.inner_mut(),
1898            instrument_id,
1899            client_id,
1900            params,
1901        );
1902        Ok(())
1903    }
1904
1905    #[cfg(feature = "defi")]
1906    #[pyo3(name = "unsubscribe_pool_fee_collects")]
1907    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1908    fn py_unsubscribe_pool_fee_collects(
1909        &mut self,
1910        py: Python<'_>,
1911        instrument_id: InstrumentId,
1912        client_id: Option<ClientId>,
1913        params: Option<Py<PyDict>>,
1914    ) -> PyResult<()> {
1915        let params = dict_to_params(py, params)?;
1916        DataActor::unsubscribe_pool_fee_collects(
1917            self.inner_mut(),
1918            instrument_id,
1919            client_id,
1920            params,
1921        );
1922        Ok(())
1923    }
1924
1925    #[cfg(feature = "defi")]
1926    #[pyo3(name = "unsubscribe_pool_flash_events")]
1927    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1928    fn py_unsubscribe_pool_flash_events(
1929        &mut self,
1930        py: Python<'_>,
1931        instrument_id: InstrumentId,
1932        client_id: Option<ClientId>,
1933        params: Option<Py<PyDict>>,
1934    ) -> PyResult<()> {
1935        let params = dict_to_params(py, params)?;
1936        DataActor::unsubscribe_pool_flash_events(
1937            self.inner_mut(),
1938            instrument_id,
1939            client_id,
1940            params,
1941        );
1942        Ok(())
1943    }
1944
1945    #[allow(unused_variables)]
1946    #[pyo3(name = "on_historical_data")]
1947    fn py_on_historical_data(&mut self, data: Py<PyAny>) -> PyResult<()> {
1948        // Default implementation - can be overridden in Python subclasses
1949        Ok(())
1950    }
1951
1952    #[allow(unused_variables)]
1953    #[pyo3(name = "on_historical_quotes")]
1954    fn py_on_historical_quotes(&mut self, quotes: Vec<QuoteTick>) -> PyResult<()> {
1955        // Default implementation - can be overridden in Python subclasses
1956        Ok(())
1957    }
1958
1959    #[allow(unused_variables)]
1960    #[pyo3(name = "on_historical_trades")]
1961    fn py_on_historical_trades(&mut self, trades: Vec<TradeTick>) -> PyResult<()> {
1962        // Default implementation - can be overridden in Python subclasses
1963        Ok(())
1964    }
1965
1966    #[allow(unused_variables)]
1967    #[pyo3(name = "on_historical_bars")]
1968    fn py_on_historical_bars(&mut self, bars: Vec<Bar>) -> PyResult<()> {
1969        // Default implementation - can be overridden in Python subclasses
1970        Ok(())
1971    }
1972
1973    #[allow(unused_variables)]
1974    #[pyo3(name = "on_historical_mark_prices")]
1975    fn py_on_historical_mark_prices(&mut self, mark_prices: Vec<MarkPriceUpdate>) -> PyResult<()> {
1976        // Default implementation - can be overridden in Python subclasses
1977        Ok(())
1978    }
1979
1980    #[allow(unused_variables)]
1981    #[pyo3(name = "on_historical_index_prices")]
1982    fn py_on_historical_index_prices(
1983        &mut self,
1984        index_prices: Vec<IndexPriceUpdate>,
1985    ) -> PyResult<()> {
1986        // Default implementation - can be overridden in Python subclasses
1987        Ok(())
1988    }
1989}
1990
1991#[cfg(test)]
1992mod tests {
1993    use std::{
1994        any::Any,
1995        cell::RefCell,
1996        collections::HashMap,
1997        ops::{Deref, DerefMut},
1998        rc::Rc,
1999        str::FromStr,
2000        sync::{Arc, Mutex},
2001    };
2002
2003    #[cfg(feature = "defi")]
2004    use alloy_primitives::{I256, U160};
2005    use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
2006    #[cfg(feature = "defi")]
2007    use nautilus_model::defi::{
2008        AmmType, Block, Blockchain, Chain, Dex, DexType, Pool, PoolIdentifier, PoolLiquidityUpdate,
2009        PoolSwap, Token,
2010    };
2011    use nautilus_model::{
2012        data::{
2013            Bar, BarType, DataType, IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate,
2014            OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
2015        },
2016        enums::{AggressorSide, BookType, InstrumentCloseType, MarketStatusAction},
2017        identifiers::{ClientId, TradeId, TraderId, Venue},
2018        instruments::{CurrencyPair, InstrumentAny, stubs::audusd_sim},
2019        orderbook::OrderBook,
2020        types::{Price, Quantity},
2021    };
2022    use pyo3::{Py, PyAny, PyResult, Python, ffi::c_str, types::PyAnyMethods};
2023    use rstest::{fixture, rstest};
2024    use ustr::Ustr;
2025
2026    use super::PyDataActor;
2027    use crate::{
2028        actor::{DataActor, data_actor::DataActorCore},
2029        cache::Cache,
2030        clock::TestClock,
2031        component::Component,
2032        enums::ComponentState,
2033        runner::{SyncDataCommandSender, set_data_cmd_sender},
2034        signal::Signal,
2035        timer::TimeEvent,
2036    };
2037
2038    #[fixture]
2039    fn clock() -> Rc<RefCell<TestClock>> {
2040        Rc::new(RefCell::new(TestClock::new()))
2041    }
2042
2043    #[fixture]
2044    fn cache() -> Rc<RefCell<Cache>> {
2045        Rc::new(RefCell::new(Cache::new(None, None)))
2046    }
2047
2048    #[fixture]
2049    fn trader_id() -> TraderId {
2050        TraderId::from("TRADER-001")
2051    }
2052
2053    #[fixture]
2054    fn client_id() -> ClientId {
2055        ClientId::new("TestClient")
2056    }
2057
2058    #[fixture]
2059    fn venue() -> Venue {
2060        Venue::from("SIM")
2061    }
2062
2063    #[fixture]
2064    fn data_type() -> DataType {
2065        DataType::new("TestData", None)
2066    }
2067
2068    #[fixture]
2069    fn bar_type(audusd_sim: CurrencyPair) -> BarType {
2070        BarType::from_str(&format!("{}-1-MINUTE-LAST-INTERNAL", audusd_sim.id)).unwrap()
2071    }
2072
2073    fn create_unregistered_actor() -> PyDataActor {
2074        PyDataActor::new(None)
2075    }
2076
2077    fn create_registered_actor(
2078        clock: Rc<RefCell<TestClock>>,
2079        cache: Rc<RefCell<Cache>>,
2080        trader_id: TraderId,
2081    ) -> PyDataActor {
2082        // Set up sync data command sender for tests
2083        let sender = SyncDataCommandSender;
2084        set_data_cmd_sender(Arc::new(sender));
2085
2086        let mut actor = PyDataActor::new(None);
2087        actor.register(trader_id, clock, cache).unwrap();
2088        actor
2089    }
2090
2091    #[rstest]
2092    fn test_new_actor_creation() {
2093        let actor = PyDataActor::new(None);
2094        assert!(actor.trader_id().is_none());
2095    }
2096
2097    #[rstest]
2098    fn test_clock_access_before_registration_raises_error() {
2099        let actor = PyDataActor::new(None);
2100
2101        // Accessing clock before registration should raise PyRuntimeError
2102        let result = actor.py_clock();
2103        assert!(result.is_err());
2104
2105        let error = result.unwrap_err();
2106        pyo3::Python::initialize();
2107        pyo3::Python::attach(|py| {
2108            assert!(error.is_instance_of::<pyo3::exceptions::PyRuntimeError>(py));
2109        });
2110
2111        let error_msg = error.to_string();
2112        assert!(
2113            error_msg.contains("Actor must be registered with a trader before accessing clock")
2114        );
2115    }
2116
2117    #[rstest]
2118    fn test_unregistered_actor_methods_work() {
2119        let actor = create_unregistered_actor();
2120
2121        assert!(!actor.py_is_ready());
2122        assert!(!actor.py_is_running());
2123        assert!(!actor.py_is_stopped());
2124        assert!(!actor.py_is_disposed());
2125        assert!(!actor.py_is_degraded());
2126        assert!(!actor.py_is_faulted());
2127
2128        // Verify unregistered state
2129        assert_eq!(actor.trader_id(), None);
2130    }
2131
2132    #[rstest]
2133    fn test_registration_success(
2134        clock: Rc<RefCell<TestClock>>,
2135        cache: Rc<RefCell<Cache>>,
2136        trader_id: TraderId,
2137    ) {
2138        let mut actor = create_unregistered_actor();
2139        actor.register(trader_id, clock, cache).unwrap();
2140        assert!(actor.trader_id().is_some());
2141        assert_eq!(actor.trader_id().unwrap(), trader_id);
2142    }
2143
2144    #[rstest]
2145    fn test_registered_actor_basic_properties(
2146        clock: Rc<RefCell<TestClock>>,
2147        cache: Rc<RefCell<Cache>>,
2148        trader_id: TraderId,
2149    ) {
2150        let actor = create_registered_actor(clock, cache, trader_id);
2151
2152        assert_eq!(actor.state(), ComponentState::Ready);
2153        assert_eq!(actor.trader_id(), Some(TraderId::from("TRADER-001")));
2154        assert!(actor.py_is_ready());
2155        assert!(!actor.py_is_running());
2156        assert!(!actor.py_is_stopped());
2157        assert!(!actor.py_is_disposed());
2158        assert!(!actor.py_is_degraded());
2159        assert!(!actor.py_is_faulted());
2160    }
2161
2162    #[rstest]
2163    fn test_basic_subscription_methods_compile(
2164        clock: Rc<RefCell<TestClock>>,
2165        cache: Rc<RefCell<Cache>>,
2166        trader_id: TraderId,
2167        data_type: DataType,
2168        client_id: ClientId,
2169        audusd_sim: CurrencyPair,
2170    ) {
2171        let mut actor = create_registered_actor(clock, cache, trader_id);
2172
2173        pyo3::Python::initialize();
2174        pyo3::Python::attach(|py| {
2175            assert!(
2176                actor
2177                    .py_subscribe_data(py, data_type.clone(), Some(client_id), None)
2178                    .is_ok()
2179            );
2180            assert!(
2181                actor
2182                    .py_subscribe_quotes(py, audusd_sim.id, Some(client_id), None)
2183                    .is_ok()
2184            );
2185            assert!(
2186                actor
2187                    .py_unsubscribe_data(py, data_type, Some(client_id), None)
2188                    .is_ok()
2189            );
2190            assert!(
2191                actor
2192                    .py_unsubscribe_quotes(py, audusd_sim.id, Some(client_id), None)
2193                    .is_ok()
2194            );
2195        });
2196    }
2197
2198    #[rstest]
2199    fn test_shutdown_system_passes_through(
2200        clock: Rc<RefCell<TestClock>>,
2201        cache: Rc<RefCell<Cache>>,
2202        trader_id: TraderId,
2203    ) {
2204        let actor = create_registered_actor(clock, cache, trader_id);
2205
2206        assert!(
2207            actor
2208                .py_shutdown_system(Some("Test shutdown".to_string()))
2209                .is_ok()
2210        );
2211        assert!(actor.py_shutdown_system(None).is_ok());
2212    }
2213
2214    #[rstest]
2215    fn test_book_at_interval_invalid_interval_ms(
2216        clock: Rc<RefCell<TestClock>>,
2217        cache: Rc<RefCell<Cache>>,
2218        trader_id: TraderId,
2219        audusd_sim: CurrencyPair,
2220    ) {
2221        pyo3::Python::initialize();
2222        let mut actor = create_registered_actor(clock, cache, trader_id);
2223
2224        pyo3::Python::attach(|py| {
2225            let result = actor.py_subscribe_book_at_interval(
2226                py,
2227                audusd_sim.id,
2228                BookType::L2_MBP,
2229                0,
2230                None,
2231                None,
2232                None,
2233            );
2234            assert!(result.is_err());
2235            assert_eq!(
2236                result.unwrap_err().to_string(),
2237                "ValueError: interval_ms must be > 0"
2238            );
2239
2240            let result = actor.py_unsubscribe_book_at_interval(py, audusd_sim.id, 0, None, None);
2241            assert!(result.is_err());
2242            assert_eq!(
2243                result.unwrap_err().to_string(),
2244                "ValueError: interval_ms must be > 0"
2245            );
2246        });
2247    }
2248
2249    #[rstest]
2250    fn test_request_methods_signatures_exist() {
2251        let actor = create_unregistered_actor();
2252        assert!(actor.trader_id().is_none());
2253    }
2254
2255    #[rstest]
2256    fn test_data_actor_trait_implementation(
2257        clock: Rc<RefCell<TestClock>>,
2258        cache: Rc<RefCell<Cache>>,
2259        trader_id: TraderId,
2260    ) {
2261        let actor = create_registered_actor(clock, cache, trader_id);
2262        let state = actor.state();
2263        assert_eq!(state, ComponentState::Ready);
2264    }
2265
2266    static CALL_TRACKER: std::sync::LazyLock<Arc<Mutex<HashMap<String, i32>>>> =
2267        std::sync::LazyLock::new(|| Arc::new(Mutex::new(HashMap::new())));
2268
2269    #[derive(Debug)]
2270    struct TestDataActor {
2271        inner: PyDataActor,
2272    }
2273
2274    impl TestDataActor {
2275        fn new() -> Self {
2276            Self {
2277                inner: PyDataActor::new(None),
2278            }
2279        }
2280
2281        fn track_call(&self, handler_name: &str) {
2282            let mut tracker = CALL_TRACKER.lock().expect(MUTEX_POISONED);
2283            *tracker.entry(handler_name.to_string()).or_insert(0) += 1;
2284        }
2285
2286        fn get_call_count(&self, handler_name: &str) -> i32 {
2287            let tracker = CALL_TRACKER.lock().expect(MUTEX_POISONED);
2288            tracker.get(handler_name).copied().unwrap_or(0)
2289        }
2290
2291        fn reset_tracker(&self) {
2292            let mut tracker = CALL_TRACKER.lock().expect(MUTEX_POISONED);
2293            tracker.clear();
2294        }
2295    }
2296
2297    impl Deref for TestDataActor {
2298        type Target = DataActorCore;
2299        fn deref(&self) -> &Self::Target {
2300            &self.inner.inner().core
2301        }
2302    }
2303
2304    impl DerefMut for TestDataActor {
2305        fn deref_mut(&mut self) -> &mut Self::Target {
2306            &mut self.inner.inner_mut().core
2307        }
2308    }
2309
2310    impl DataActor for TestDataActor {
2311        fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
2312            self.track_call("on_time_event");
2313            self.inner.inner_mut().on_time_event(event)
2314        }
2315
2316        fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
2317            self.track_call("on_data");
2318            self.inner.inner_mut().on_data(data)
2319        }
2320
2321        fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
2322            self.track_call("on_signal");
2323            self.inner.inner_mut().on_signal(signal)
2324        }
2325
2326        fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
2327            self.track_call("on_instrument");
2328            self.inner.inner_mut().on_instrument(instrument)
2329        }
2330
2331        fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
2332            self.track_call("on_quote");
2333            self.inner.inner_mut().on_quote(quote)
2334        }
2335
2336        fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
2337            self.track_call("on_trade");
2338            self.inner.inner_mut().on_trade(trade)
2339        }
2340
2341        fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
2342            self.track_call("on_bar");
2343            self.inner.inner_mut().on_bar(bar)
2344        }
2345
2346        fn on_book(&mut self, book: &OrderBook) -> anyhow::Result<()> {
2347            self.track_call("on_book");
2348            self.inner.inner_mut().on_book(book)
2349        }
2350
2351        fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
2352            self.track_call("on_book_deltas");
2353            self.inner.inner_mut().on_book_deltas(deltas)
2354        }
2355
2356        fn on_mark_price(&mut self, update: &MarkPriceUpdate) -> anyhow::Result<()> {
2357            self.track_call("on_mark_price");
2358            self.inner.inner_mut().on_mark_price(update)
2359        }
2360
2361        fn on_index_price(&mut self, update: &IndexPriceUpdate) -> anyhow::Result<()> {
2362            self.track_call("on_index_price");
2363            self.inner.inner_mut().on_index_price(update)
2364        }
2365
2366        fn on_instrument_status(&mut self, update: &InstrumentStatus) -> anyhow::Result<()> {
2367            self.track_call("on_instrument_status");
2368            self.inner.inner_mut().on_instrument_status(update)
2369        }
2370
2371        fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
2372            self.track_call("on_instrument_close");
2373            self.inner.inner_mut().on_instrument_close(update)
2374        }
2375
2376        #[cfg(feature = "defi")]
2377        fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
2378            self.track_call("on_block");
2379            self.inner.inner_mut().on_block(block)
2380        }
2381
2382        #[cfg(feature = "defi")]
2383        fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
2384            self.track_call("on_pool");
2385            self.inner.inner_mut().on_pool(pool)
2386        }
2387
2388        #[cfg(feature = "defi")]
2389        fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
2390            self.track_call("on_pool_swap");
2391            self.inner.inner_mut().on_pool_swap(swap)
2392        }
2393
2394        #[cfg(feature = "defi")]
2395        fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
2396            self.track_call("on_pool_liquidity_update");
2397            self.inner.inner_mut().on_pool_liquidity_update(update)
2398        }
2399    }
2400
2401    #[rstest]
2402    fn test_python_on_signal_handler(
2403        clock: Rc<RefCell<TestClock>>,
2404        cache: Rc<RefCell<Cache>>,
2405        trader_id: TraderId,
2406    ) {
2407        pyo3::Python::initialize();
2408        let mut test_actor = TestDataActor::new();
2409        test_actor.reset_tracker();
2410        test_actor.register(trader_id, clock, cache).unwrap();
2411
2412        let signal = Signal::new(
2413            Ustr::from("test_signal"),
2414            "1.0".to_string(),
2415            UnixNanos::default(),
2416            UnixNanos::default(),
2417        );
2418
2419        assert!(test_actor.on_signal(&signal).is_ok());
2420        assert_eq!(test_actor.get_call_count("on_signal"), 1);
2421    }
2422
2423    #[rstest]
2424    fn test_python_on_data_handler(
2425        clock: Rc<RefCell<TestClock>>,
2426        cache: Rc<RefCell<Cache>>,
2427        trader_id: TraderId,
2428    ) {
2429        pyo3::Python::initialize();
2430        let mut test_actor = TestDataActor::new();
2431        test_actor.reset_tracker();
2432        test_actor.register(trader_id, clock, cache).unwrap();
2433
2434        assert!(test_actor.on_data(&()).is_ok());
2435        assert_eq!(test_actor.get_call_count("on_data"), 1);
2436    }
2437
2438    #[rstest]
2439    fn test_python_on_time_event_handler(
2440        clock: Rc<RefCell<TestClock>>,
2441        cache: Rc<RefCell<Cache>>,
2442        trader_id: TraderId,
2443    ) {
2444        pyo3::Python::initialize();
2445        let mut test_actor = TestDataActor::new();
2446        test_actor.reset_tracker();
2447        test_actor.register(trader_id, clock, cache).unwrap();
2448
2449        let time_event = TimeEvent::new(
2450            Ustr::from("test_timer"),
2451            UUID4::new(),
2452            UnixNanos::default(),
2453            UnixNanos::default(),
2454        );
2455
2456        assert!(test_actor.on_time_event(&time_event).is_ok());
2457        assert_eq!(test_actor.get_call_count("on_time_event"), 1);
2458    }
2459
2460    #[rstest]
2461    fn test_python_on_instrument_handler(
2462        clock: Rc<RefCell<TestClock>>,
2463        cache: Rc<RefCell<Cache>>,
2464        trader_id: TraderId,
2465        audusd_sim: CurrencyPair,
2466    ) {
2467        pyo3::Python::initialize();
2468        let mut rust_actor = PyDataActor::new(None);
2469        rust_actor.register(trader_id, clock, cache).unwrap();
2470
2471        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2472
2473        assert!(rust_actor.inner_mut().on_instrument(&instrument).is_ok());
2474    }
2475
2476    #[rstest]
2477    fn test_python_on_quote_handler(
2478        clock: Rc<RefCell<TestClock>>,
2479        cache: Rc<RefCell<Cache>>,
2480        trader_id: TraderId,
2481        audusd_sim: CurrencyPair,
2482    ) {
2483        pyo3::Python::initialize();
2484        let mut rust_actor = PyDataActor::new(None);
2485        rust_actor.register(trader_id, clock, cache).unwrap();
2486
2487        let quote = QuoteTick::new(
2488            audusd_sim.id,
2489            Price::from("1.0000"),
2490            Price::from("1.0001"),
2491            Quantity::from("100000"),
2492            Quantity::from("100000"),
2493            UnixNanos::default(),
2494            UnixNanos::default(),
2495        );
2496
2497        assert!(rust_actor.inner_mut().on_quote(&quote).is_ok());
2498    }
2499
2500    #[rstest]
2501    fn test_python_on_trade_handler(
2502        clock: Rc<RefCell<TestClock>>,
2503        cache: Rc<RefCell<Cache>>,
2504        trader_id: TraderId,
2505        audusd_sim: CurrencyPair,
2506    ) {
2507        pyo3::Python::initialize();
2508        let mut rust_actor = PyDataActor::new(None);
2509        rust_actor.register(trader_id, clock, cache).unwrap();
2510
2511        let trade = TradeTick::new(
2512            audusd_sim.id,
2513            Price::from("1.0000"),
2514            Quantity::from("100000"),
2515            AggressorSide::Buyer,
2516            "T123".to_string().into(),
2517            UnixNanos::default(),
2518            UnixNanos::default(),
2519        );
2520
2521        assert!(rust_actor.inner_mut().on_trade(&trade).is_ok());
2522    }
2523
2524    #[rstest]
2525    fn test_python_on_bar_handler(
2526        clock: Rc<RefCell<TestClock>>,
2527        cache: Rc<RefCell<Cache>>,
2528        trader_id: TraderId,
2529        audusd_sim: CurrencyPair,
2530    ) {
2531        pyo3::Python::initialize();
2532        let mut rust_actor = PyDataActor::new(None);
2533        rust_actor.register(trader_id, clock, cache).unwrap();
2534
2535        let bar_type =
2536            BarType::from_str(&format!("{}-1-MINUTE-LAST-INTERNAL", audusd_sim.id)).unwrap();
2537        let bar = Bar::new(
2538            bar_type,
2539            Price::from("1.0000"),
2540            Price::from("1.0001"),
2541            Price::from("0.9999"),
2542            Price::from("1.0000"),
2543            Quantity::from("100000"),
2544            UnixNanos::default(),
2545            UnixNanos::default(),
2546        );
2547
2548        assert!(rust_actor.inner_mut().on_bar(&bar).is_ok());
2549    }
2550
2551    #[rstest]
2552    fn test_python_on_book_handler(
2553        clock: Rc<RefCell<TestClock>>,
2554        cache: Rc<RefCell<Cache>>,
2555        trader_id: TraderId,
2556        audusd_sim: CurrencyPair,
2557    ) {
2558        pyo3::Python::initialize();
2559        let mut rust_actor = PyDataActor::new(None);
2560        rust_actor.register(trader_id, clock, cache).unwrap();
2561
2562        let book = OrderBook::new(audusd_sim.id, BookType::L2_MBP);
2563        assert!(rust_actor.inner_mut().on_book(&book).is_ok());
2564    }
2565
2566    #[rstest]
2567    fn test_python_on_book_deltas_handler(
2568        clock: Rc<RefCell<TestClock>>,
2569        cache: Rc<RefCell<Cache>>,
2570        trader_id: TraderId,
2571        audusd_sim: CurrencyPair,
2572    ) {
2573        pyo3::Python::initialize();
2574        let mut rust_actor = PyDataActor::new(None);
2575        rust_actor.register(trader_id, clock, cache).unwrap();
2576
2577        let delta =
2578            OrderBookDelta::clear(audusd_sim.id, 0, UnixNanos::default(), UnixNanos::default());
2579        let deltas = OrderBookDeltas::new(audusd_sim.id, vec![delta]);
2580
2581        assert!(rust_actor.inner_mut().on_book_deltas(&deltas).is_ok());
2582    }
2583
2584    #[rstest]
2585    fn test_python_on_mark_price_handler(
2586        clock: Rc<RefCell<TestClock>>,
2587        cache: Rc<RefCell<Cache>>,
2588        trader_id: TraderId,
2589        audusd_sim: CurrencyPair,
2590    ) {
2591        pyo3::Python::initialize();
2592        let mut rust_actor = PyDataActor::new(None);
2593        rust_actor.register(trader_id, clock, cache).unwrap();
2594
2595        let mark_price = MarkPriceUpdate::new(
2596            audusd_sim.id,
2597            Price::from("1.0000"),
2598            UnixNanos::default(),
2599            UnixNanos::default(),
2600        );
2601
2602        assert!(rust_actor.inner_mut().on_mark_price(&mark_price).is_ok());
2603    }
2604
2605    #[rstest]
2606    fn test_python_on_index_price_handler(
2607        clock: Rc<RefCell<TestClock>>,
2608        cache: Rc<RefCell<Cache>>,
2609        trader_id: TraderId,
2610        audusd_sim: CurrencyPair,
2611    ) {
2612        pyo3::Python::initialize();
2613        let mut rust_actor = PyDataActor::new(None);
2614        rust_actor.register(trader_id, clock, cache).unwrap();
2615
2616        let index_price = IndexPriceUpdate::new(
2617            audusd_sim.id,
2618            Price::from("1.0000"),
2619            UnixNanos::default(),
2620            UnixNanos::default(),
2621        );
2622
2623        assert!(rust_actor.inner_mut().on_index_price(&index_price).is_ok());
2624    }
2625
2626    #[rstest]
2627    fn test_python_on_instrument_status_handler(
2628        clock: Rc<RefCell<TestClock>>,
2629        cache: Rc<RefCell<Cache>>,
2630        trader_id: TraderId,
2631        audusd_sim: CurrencyPair,
2632    ) {
2633        pyo3::Python::initialize();
2634        let mut rust_actor = PyDataActor::new(None);
2635        rust_actor.register(trader_id, clock, cache).unwrap();
2636
2637        let status = InstrumentStatus::new(
2638            audusd_sim.id,
2639            MarketStatusAction::Trading,
2640            UnixNanos::default(),
2641            UnixNanos::default(),
2642            None,
2643            None,
2644            None,
2645            None,
2646            None,
2647        );
2648
2649        assert!(rust_actor.inner_mut().on_instrument_status(&status).is_ok());
2650    }
2651
2652    #[rstest]
2653    fn test_python_on_instrument_close_handler(
2654        clock: Rc<RefCell<TestClock>>,
2655        cache: Rc<RefCell<Cache>>,
2656        trader_id: TraderId,
2657        audusd_sim: CurrencyPair,
2658    ) {
2659        pyo3::Python::initialize();
2660        let mut rust_actor = PyDataActor::new(None);
2661        rust_actor.register(trader_id, clock, cache).unwrap();
2662
2663        let close = InstrumentClose::new(
2664            audusd_sim.id,
2665            Price::from("1.0000"),
2666            InstrumentCloseType::EndOfSession,
2667            UnixNanos::default(),
2668            UnixNanos::default(),
2669        );
2670
2671        assert!(rust_actor.inner_mut().on_instrument_close(&close).is_ok());
2672    }
2673
2674    #[cfg(feature = "defi")]
2675    #[rstest]
2676    fn test_python_on_block_handler(
2677        clock: Rc<RefCell<TestClock>>,
2678        cache: Rc<RefCell<Cache>>,
2679        trader_id: TraderId,
2680    ) {
2681        pyo3::Python::initialize();
2682        let mut test_actor = TestDataActor::new();
2683        test_actor.reset_tracker();
2684        test_actor.register(trader_id, clock, cache).unwrap();
2685
2686        let block = Block::new(
2687            "0x1234567890abcdef".to_string(),
2688            "0xabcdef1234567890".to_string(),
2689            12345,
2690            "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0".into(),
2691            21000,
2692            20000,
2693            UnixNanos::default(),
2694            Some(Blockchain::Ethereum),
2695        );
2696
2697        assert!(test_actor.on_block(&block).is_ok());
2698        assert_eq!(test_actor.get_call_count("on_block"), 1);
2699    }
2700
2701    #[cfg(feature = "defi")]
2702    #[rstest]
2703    fn test_python_on_pool_swap_handler(
2704        clock: Rc<RefCell<TestClock>>,
2705        cache: Rc<RefCell<Cache>>,
2706        trader_id: TraderId,
2707    ) {
2708        pyo3::Python::initialize();
2709        let mut rust_actor = PyDataActor::new(None);
2710        rust_actor.register(trader_id, clock, cache).unwrap();
2711
2712        let chain = Arc::new(Chain::new(Blockchain::Ethereum, 1));
2713        let dex = Arc::new(Dex::new(
2714            Chain::new(Blockchain::Ethereum, 1),
2715            DexType::UniswapV3,
2716            "0x1F98431c8aD98523631AE4a59f267346ea31F984",
2717            0,
2718            AmmType::CLAMM,
2719            "PoolCreated",
2720            "Swap",
2721            "Mint",
2722            "Burn",
2723            "Collect",
2724        ));
2725        let token0 = Token::new(
2726            chain.clone(),
2727            "0xa0b86a33e6441c8c06dd7b111a8c4e82e2b2a5e1"
2728                .parse()
2729                .unwrap(),
2730            "USDC".into(),
2731            "USD Coin".into(),
2732            6,
2733        );
2734        let token1 = Token::new(
2735            chain.clone(),
2736            "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"
2737                .parse()
2738                .unwrap(),
2739            "WETH".into(),
2740            "Wrapped Ether".into(),
2741            18,
2742        );
2743        let pool_address = "0x8ad599c3A0ff1De082011EFDDc58f1908eb6e6D8"
2744            .parse()
2745            .unwrap();
2746        let pool_identifier: PoolIdentifier = "0x8ad599c3A0ff1De082011EFDDc58f1908eb6e6D8"
2747            .parse()
2748            .unwrap();
2749        let pool = Arc::new(Pool::new(
2750            chain.clone(),
2751            dex.clone(),
2752            pool_address,
2753            pool_identifier,
2754            12345,
2755            token0,
2756            token1,
2757            Some(500),
2758            Some(10),
2759            UnixNanos::default(),
2760        ));
2761
2762        let swap = PoolSwap::new(
2763            chain,
2764            dex,
2765            pool.instrument_id,
2766            pool.pool_identifier,
2767            12345,
2768            "0xabc123".to_string(),
2769            0,
2770            0,
2771            None,
2772            "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
2773                .parse()
2774                .unwrap(),
2775            "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
2776                .parse()
2777                .unwrap(),
2778            I256::from_str("1000000000000000000").unwrap(),
2779            I256::from_str("400000000000000").unwrap(),
2780            U160::from(59000000000000u128),
2781            1000000,
2782            100,
2783        );
2784
2785        assert!(rust_actor.inner_mut().on_pool_swap(&swap).is_ok());
2786    }
2787
2788    #[cfg(feature = "defi")]
2789    #[rstest]
2790    fn test_python_on_pool_liquidity_update_handler(
2791        clock: Rc<RefCell<TestClock>>,
2792        cache: Rc<RefCell<Cache>>,
2793        trader_id: TraderId,
2794    ) {
2795        pyo3::Python::initialize();
2796        let mut rust_actor = PyDataActor::new(None);
2797        rust_actor.register(trader_id, clock, cache).unwrap();
2798
2799        let block = Block::new(
2800            "0x1234567890abcdef".to_string(),
2801            "0xabcdef1234567890".to_string(),
2802            12345,
2803            "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0".into(),
2804            21000,
2805            20000,
2806            UnixNanos::default(),
2807            Some(Blockchain::Ethereum),
2808        );
2809
2810        // We test on_block since PoolLiquidityUpdate construction is complex
2811        assert!(rust_actor.inner_mut().on_block(&block).is_ok());
2812    }
2813
2814    const TRACKING_ACTOR_CODE: &std::ffi::CStr = c_str!(
2815        r#"
2816class TrackingActor:
2817    """A mock Python actor that tracks all method calls."""
2818
2819    def __init__(self):
2820        self.calls = []
2821
2822    def _record(self, method_name, *args):
2823        self.calls.append((method_name, args))
2824
2825    def was_called(self, method_name):
2826        return any(call[0] == method_name for call in self.calls)
2827
2828    def call_count(self, method_name):
2829        return sum(1 for call in self.calls if call[0] == method_name)
2830
2831    def on_start(self):
2832        self._record("on_start")
2833
2834    def on_stop(self):
2835        self._record("on_stop")
2836
2837    def on_resume(self):
2838        self._record("on_resume")
2839
2840    def on_reset(self):
2841        self._record("on_reset")
2842
2843    def on_dispose(self):
2844        self._record("on_dispose")
2845
2846    def on_degrade(self):
2847        self._record("on_degrade")
2848
2849    def on_fault(self):
2850        self._record("on_fault")
2851
2852    def on_time_event(self, event):
2853        self._record("on_time_event", event)
2854
2855    def on_data(self, data):
2856        self._record("on_data", data)
2857
2858    def on_signal(self, signal):
2859        self._record("on_signal", signal)
2860
2861    def on_instrument(self, instrument):
2862        self._record("on_instrument", instrument)
2863
2864    def on_quote(self, quote):
2865        self._record("on_quote", quote)
2866
2867    def on_trade(self, trade):
2868        self._record("on_trade", trade)
2869
2870    def on_bar(self, bar):
2871        self._record("on_bar", bar)
2872
2873    def on_book(self, book):
2874        self._record("on_book", book)
2875
2876    def on_book_deltas(self, deltas):
2877        self._record("on_book_deltas", deltas)
2878
2879    def on_mark_price(self, update):
2880        self._record("on_mark_price", update)
2881
2882    def on_index_price(self, update):
2883        self._record("on_index_price", update)
2884
2885    def on_funding_rate(self, update):
2886        self._record("on_funding_rate", update)
2887
2888    def on_instrument_status(self, status):
2889        self._record("on_instrument_status", status)
2890
2891    def on_instrument_close(self, close):
2892        self._record("on_instrument_close", close)
2893
2894    def on_historical_data(self, data):
2895        self._record("on_historical_data", data)
2896
2897    def on_historical_quotes(self, quotes):
2898        self._record("on_historical_quotes", quotes)
2899
2900    def on_historical_trades(self, trades):
2901        self._record("on_historical_trades", trades)
2902
2903    def on_historical_bars(self, bars):
2904        self._record("on_historical_bars", bars)
2905
2906    def on_historical_mark_prices(self, prices):
2907        self._record("on_historical_mark_prices", prices)
2908
2909    def on_historical_index_prices(self, prices):
2910        self._record("on_historical_index_prices", prices)
2911"#
2912    );
2913
2914    fn create_tracking_python_actor(py: Python<'_>) -> PyResult<Py<PyAny>> {
2915        py.run(TRACKING_ACTOR_CODE, None, None)?;
2916        let tracking_actor_class = py.eval(c_str!("TrackingActor"), None, None)?;
2917        let instance = tracking_actor_class.call0()?;
2918        Ok(instance.unbind())
2919    }
2920
2921    fn python_method_was_called(py_actor: &Py<PyAny>, py: Python<'_>, method_name: &str) -> bool {
2922        py_actor
2923            .call_method1(py, "was_called", (method_name,))
2924            .and_then(|r| r.extract::<bool>(py))
2925            .unwrap_or(false)
2926    }
2927
2928    fn python_method_call_count(py_actor: &Py<PyAny>, py: Python<'_>, method_name: &str) -> i32 {
2929        py_actor
2930            .call_method1(py, "call_count", (method_name,))
2931            .and_then(|r| r.extract::<i32>(py))
2932            .unwrap_or(0)
2933    }
2934
2935    #[rstest]
2936    fn test_python_dispatch_on_start(
2937        clock: Rc<RefCell<TestClock>>,
2938        cache: Rc<RefCell<Cache>>,
2939        trader_id: TraderId,
2940    ) {
2941        pyo3::Python::initialize();
2942        Python::attach(|py| {
2943            let py_actor = create_tracking_python_actor(py).unwrap();
2944
2945            let mut rust_actor = PyDataActor::new(None);
2946            rust_actor.set_python_instance(py_actor.clone_ref(py));
2947            rust_actor.register(trader_id, clock, cache).unwrap();
2948
2949            let result = DataActor::on_start(rust_actor.inner_mut());
2950
2951            assert!(result.is_ok());
2952            assert!(python_method_was_called(&py_actor, py, "on_start"));
2953            assert_eq!(python_method_call_count(&py_actor, py, "on_start"), 1);
2954        });
2955    }
2956
2957    #[rstest]
2958    fn test_python_dispatch_on_stop(
2959        clock: Rc<RefCell<TestClock>>,
2960        cache: Rc<RefCell<Cache>>,
2961        trader_id: TraderId,
2962    ) {
2963        pyo3::Python::initialize();
2964        Python::attach(|py| {
2965            let py_actor = create_tracking_python_actor(py).unwrap();
2966
2967            let mut rust_actor = PyDataActor::new(None);
2968            rust_actor.set_python_instance(py_actor.clone_ref(py));
2969            rust_actor.register(trader_id, clock, cache).unwrap();
2970
2971            let result = DataActor::on_stop(rust_actor.inner_mut());
2972
2973            assert!(result.is_ok());
2974            assert!(python_method_was_called(&py_actor, py, "on_stop"));
2975        });
2976    }
2977
2978    #[rstest]
2979    fn test_python_dispatch_on_resume(
2980        clock: Rc<RefCell<TestClock>>,
2981        cache: Rc<RefCell<Cache>>,
2982        trader_id: TraderId,
2983    ) {
2984        pyo3::Python::initialize();
2985        Python::attach(|py| {
2986            let py_actor = create_tracking_python_actor(py).unwrap();
2987
2988            let mut rust_actor = PyDataActor::new(None);
2989            rust_actor.set_python_instance(py_actor.clone_ref(py));
2990            rust_actor.register(trader_id, clock, cache).unwrap();
2991
2992            let result = DataActor::on_resume(rust_actor.inner_mut());
2993
2994            assert!(result.is_ok());
2995            assert!(python_method_was_called(&py_actor, py, "on_resume"));
2996        });
2997    }
2998
2999    #[rstest]
3000    fn test_python_dispatch_on_reset(
3001        clock: Rc<RefCell<TestClock>>,
3002        cache: Rc<RefCell<Cache>>,
3003        trader_id: TraderId,
3004    ) {
3005        pyo3::Python::initialize();
3006        Python::attach(|py| {
3007            let py_actor = create_tracking_python_actor(py).unwrap();
3008
3009            let mut rust_actor = PyDataActor::new(None);
3010            rust_actor.set_python_instance(py_actor.clone_ref(py));
3011            rust_actor.register(trader_id, clock, cache).unwrap();
3012
3013            let result = DataActor::on_reset(rust_actor.inner_mut());
3014
3015            assert!(result.is_ok());
3016            assert!(python_method_was_called(&py_actor, py, "on_reset"));
3017        });
3018    }
3019
3020    #[rstest]
3021    fn test_python_dispatch_on_dispose(
3022        clock: Rc<RefCell<TestClock>>,
3023        cache: Rc<RefCell<Cache>>,
3024        trader_id: TraderId,
3025    ) {
3026        pyo3::Python::initialize();
3027        Python::attach(|py| {
3028            let py_actor = create_tracking_python_actor(py).unwrap();
3029
3030            let mut rust_actor = PyDataActor::new(None);
3031            rust_actor.set_python_instance(py_actor.clone_ref(py));
3032            rust_actor.register(trader_id, clock, cache).unwrap();
3033
3034            let result = DataActor::on_dispose(rust_actor.inner_mut());
3035
3036            assert!(result.is_ok());
3037            assert!(python_method_was_called(&py_actor, py, "on_dispose"));
3038        });
3039    }
3040
3041    #[rstest]
3042    fn test_python_dispatch_on_degrade(
3043        clock: Rc<RefCell<TestClock>>,
3044        cache: Rc<RefCell<Cache>>,
3045        trader_id: TraderId,
3046    ) {
3047        pyo3::Python::initialize();
3048        Python::attach(|py| {
3049            let py_actor = create_tracking_python_actor(py).unwrap();
3050
3051            let mut rust_actor = PyDataActor::new(None);
3052            rust_actor.set_python_instance(py_actor.clone_ref(py));
3053            rust_actor.register(trader_id, clock, cache).unwrap();
3054
3055            let result = DataActor::on_degrade(rust_actor.inner_mut());
3056
3057            assert!(result.is_ok());
3058            assert!(python_method_was_called(&py_actor, py, "on_degrade"));
3059        });
3060    }
3061
3062    #[rstest]
3063    fn test_python_dispatch_on_fault(
3064        clock: Rc<RefCell<TestClock>>,
3065        cache: Rc<RefCell<Cache>>,
3066        trader_id: TraderId,
3067    ) {
3068        pyo3::Python::initialize();
3069        Python::attach(|py| {
3070            let py_actor = create_tracking_python_actor(py).unwrap();
3071
3072            let mut rust_actor = PyDataActor::new(None);
3073            rust_actor.set_python_instance(py_actor.clone_ref(py));
3074            rust_actor.register(trader_id, clock, cache).unwrap();
3075
3076            let result = DataActor::on_fault(rust_actor.inner_mut());
3077
3078            assert!(result.is_ok());
3079            assert!(python_method_was_called(&py_actor, py, "on_fault"));
3080        });
3081    }
3082
3083    #[rstest]
3084    fn test_python_dispatch_on_signal(
3085        clock: Rc<RefCell<TestClock>>,
3086        cache: Rc<RefCell<Cache>>,
3087        trader_id: TraderId,
3088    ) {
3089        pyo3::Python::initialize();
3090        Python::attach(|py| {
3091            let py_actor = create_tracking_python_actor(py).unwrap();
3092
3093            let mut rust_actor = PyDataActor::new(None);
3094            rust_actor.set_python_instance(py_actor.clone_ref(py));
3095            rust_actor.register(trader_id, clock, cache).unwrap();
3096
3097            let signal = Signal::new(
3098                Ustr::from("test_signal"),
3099                "1.0".to_string(),
3100                UnixNanos::default(),
3101                UnixNanos::default(),
3102            );
3103
3104            let result = rust_actor.inner_mut().on_signal(&signal);
3105
3106            assert!(result.is_ok());
3107            assert!(python_method_was_called(&py_actor, py, "on_signal"));
3108        });
3109    }
3110
3111    #[rstest]
3112    fn test_python_dispatch_on_time_event(
3113        clock: Rc<RefCell<TestClock>>,
3114        cache: Rc<RefCell<Cache>>,
3115        trader_id: TraderId,
3116    ) {
3117        pyo3::Python::initialize();
3118        Python::attach(|py| {
3119            let py_actor = create_tracking_python_actor(py).unwrap();
3120
3121            let mut rust_actor = PyDataActor::new(None);
3122            rust_actor.set_python_instance(py_actor.clone_ref(py));
3123            rust_actor.register(trader_id, clock, cache).unwrap();
3124
3125            let time_event = TimeEvent::new(
3126                Ustr::from("test_timer"),
3127                UUID4::new(),
3128                UnixNanos::default(),
3129                UnixNanos::default(),
3130            );
3131
3132            let result = rust_actor.inner_mut().on_time_event(&time_event);
3133
3134            assert!(result.is_ok());
3135            assert!(python_method_was_called(&py_actor, py, "on_time_event"));
3136        });
3137    }
3138
3139    #[rstest]
3140    fn test_python_dispatch_on_instrument(
3141        clock: Rc<RefCell<TestClock>>,
3142        cache: Rc<RefCell<Cache>>,
3143        trader_id: TraderId,
3144        audusd_sim: CurrencyPair,
3145    ) {
3146        pyo3::Python::initialize();
3147        Python::attach(|py| {
3148            let py_actor = create_tracking_python_actor(py).unwrap();
3149
3150            let mut rust_actor = PyDataActor::new(None);
3151            rust_actor.set_python_instance(py_actor.clone_ref(py));
3152            rust_actor.register(trader_id, clock, cache).unwrap();
3153
3154            let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3155
3156            let result = rust_actor.inner_mut().on_instrument(&instrument);
3157
3158            assert!(result.is_ok());
3159            assert!(python_method_was_called(&py_actor, py, "on_instrument"));
3160        });
3161    }
3162
3163    #[rstest]
3164    fn test_python_dispatch_on_quote(
3165        clock: Rc<RefCell<TestClock>>,
3166        cache: Rc<RefCell<Cache>>,
3167        trader_id: TraderId,
3168        audusd_sim: CurrencyPair,
3169    ) {
3170        pyo3::Python::initialize();
3171        Python::attach(|py| {
3172            let py_actor = create_tracking_python_actor(py).unwrap();
3173
3174            let mut rust_actor = PyDataActor::new(None);
3175            rust_actor.set_python_instance(py_actor.clone_ref(py));
3176            rust_actor.register(trader_id, clock, cache).unwrap();
3177
3178            let quote = QuoteTick::new(
3179                audusd_sim.id,
3180                Price::from("1.00000"),
3181                Price::from("1.00001"),
3182                Quantity::from(100_000),
3183                Quantity::from(100_000),
3184                UnixNanos::default(),
3185                UnixNanos::default(),
3186            );
3187
3188            let result = rust_actor.inner_mut().on_quote(&quote);
3189
3190            assert!(result.is_ok());
3191            assert!(python_method_was_called(&py_actor, py, "on_quote"));
3192        });
3193    }
3194
3195    #[rstest]
3196    fn test_python_dispatch_on_trade(
3197        clock: Rc<RefCell<TestClock>>,
3198        cache: Rc<RefCell<Cache>>,
3199        trader_id: TraderId,
3200        audusd_sim: CurrencyPair,
3201    ) {
3202        pyo3::Python::initialize();
3203        Python::attach(|py| {
3204            let py_actor = create_tracking_python_actor(py).unwrap();
3205
3206            let mut rust_actor = PyDataActor::new(None);
3207            rust_actor.set_python_instance(py_actor.clone_ref(py));
3208            rust_actor.register(trader_id, clock, cache).unwrap();
3209
3210            let trade = TradeTick::new(
3211                audusd_sim.id,
3212                Price::from("1.00000"),
3213                Quantity::from(100_000),
3214                AggressorSide::Buyer,
3215                TradeId::new("123456"),
3216                UnixNanos::default(),
3217                UnixNanos::default(),
3218            );
3219
3220            let result = rust_actor.inner_mut().on_trade(&trade);
3221
3222            assert!(result.is_ok());
3223            assert!(python_method_was_called(&py_actor, py, "on_trade"));
3224        });
3225    }
3226
3227    #[rstest]
3228    fn test_python_dispatch_on_bar(
3229        clock: Rc<RefCell<TestClock>>,
3230        cache: Rc<RefCell<Cache>>,
3231        trader_id: TraderId,
3232        audusd_sim: CurrencyPair,
3233    ) {
3234        pyo3::Python::initialize();
3235        Python::attach(|py| {
3236            let py_actor = create_tracking_python_actor(py).unwrap();
3237
3238            let mut rust_actor = PyDataActor::new(None);
3239            rust_actor.set_python_instance(py_actor.clone_ref(py));
3240            rust_actor.register(trader_id, clock, cache).unwrap();
3241
3242            let bar_type =
3243                BarType::from_str(&format!("{}-1-MINUTE-LAST-INTERNAL", audusd_sim.id)).unwrap();
3244            let bar = Bar::new(
3245                bar_type,
3246                Price::from("1.00000"),
3247                Price::from("1.00010"),
3248                Price::from("0.99990"),
3249                Price::from("1.00005"),
3250                Quantity::from(100_000),
3251                UnixNanos::default(),
3252                UnixNanos::default(),
3253            );
3254
3255            let result = rust_actor.inner_mut().on_bar(&bar);
3256
3257            assert!(result.is_ok());
3258            assert!(python_method_was_called(&py_actor, py, "on_bar"));
3259        });
3260    }
3261
3262    #[rstest]
3263    fn test_python_dispatch_on_book(
3264        clock: Rc<RefCell<TestClock>>,
3265        cache: Rc<RefCell<Cache>>,
3266        trader_id: TraderId,
3267        audusd_sim: CurrencyPair,
3268    ) {
3269        pyo3::Python::initialize();
3270        Python::attach(|py| {
3271            let py_actor = create_tracking_python_actor(py).unwrap();
3272
3273            let mut rust_actor = PyDataActor::new(None);
3274            rust_actor.set_python_instance(py_actor.clone_ref(py));
3275            rust_actor.register(trader_id, clock, cache).unwrap();
3276
3277            let book = OrderBook::new(audusd_sim.id, BookType::L2_MBP);
3278
3279            let result = rust_actor.inner_mut().on_book(&book);
3280
3281            assert!(result.is_ok());
3282            assert!(python_method_was_called(&py_actor, py, "on_book"));
3283        });
3284    }
3285
3286    #[rstest]
3287    fn test_python_dispatch_on_book_deltas(
3288        clock: Rc<RefCell<TestClock>>,
3289        cache: Rc<RefCell<Cache>>,
3290        trader_id: TraderId,
3291        audusd_sim: CurrencyPair,
3292    ) {
3293        pyo3::Python::initialize();
3294        Python::attach(|py| {
3295            let py_actor = create_tracking_python_actor(py).unwrap();
3296
3297            let mut rust_actor = PyDataActor::new(None);
3298            rust_actor.set_python_instance(py_actor.clone_ref(py));
3299            rust_actor.register(trader_id, clock, cache).unwrap();
3300
3301            let delta =
3302                OrderBookDelta::clear(audusd_sim.id, 0, UnixNanos::default(), UnixNanos::default());
3303            let deltas = OrderBookDeltas::new(audusd_sim.id, vec![delta]);
3304
3305            let result = rust_actor.inner_mut().on_book_deltas(&deltas);
3306
3307            assert!(result.is_ok());
3308            assert!(python_method_was_called(&py_actor, py, "on_book_deltas"));
3309        });
3310    }
3311
3312    #[rstest]
3313    fn test_python_dispatch_on_mark_price(
3314        clock: Rc<RefCell<TestClock>>,
3315        cache: Rc<RefCell<Cache>>,
3316        trader_id: TraderId,
3317        audusd_sim: CurrencyPair,
3318    ) {
3319        pyo3::Python::initialize();
3320        Python::attach(|py| {
3321            let py_actor = create_tracking_python_actor(py).unwrap();
3322
3323            let mut rust_actor = PyDataActor::new(None);
3324            rust_actor.set_python_instance(py_actor.clone_ref(py));
3325            rust_actor.register(trader_id, clock, cache).unwrap();
3326
3327            let mark_price = MarkPriceUpdate::new(
3328                audusd_sim.id,
3329                Price::from("1.00000"),
3330                UnixNanos::default(),
3331                UnixNanos::default(),
3332            );
3333
3334            let result = rust_actor.inner_mut().on_mark_price(&mark_price);
3335
3336            assert!(result.is_ok());
3337            assert!(python_method_was_called(&py_actor, py, "on_mark_price"));
3338        });
3339    }
3340
3341    #[rstest]
3342    fn test_python_dispatch_on_index_price(
3343        clock: Rc<RefCell<TestClock>>,
3344        cache: Rc<RefCell<Cache>>,
3345        trader_id: TraderId,
3346        audusd_sim: CurrencyPair,
3347    ) {
3348        pyo3::Python::initialize();
3349        Python::attach(|py| {
3350            let py_actor = create_tracking_python_actor(py).unwrap();
3351
3352            let mut rust_actor = PyDataActor::new(None);
3353            rust_actor.set_python_instance(py_actor.clone_ref(py));
3354            rust_actor.register(trader_id, clock, cache).unwrap();
3355
3356            let index_price = IndexPriceUpdate::new(
3357                audusd_sim.id,
3358                Price::from("1.00000"),
3359                UnixNanos::default(),
3360                UnixNanos::default(),
3361            );
3362
3363            let result = rust_actor.inner_mut().on_index_price(&index_price);
3364
3365            assert!(result.is_ok());
3366            assert!(python_method_was_called(&py_actor, py, "on_index_price"));
3367        });
3368    }
3369
3370    #[rstest]
3371    fn test_python_dispatch_on_instrument_status(
3372        clock: Rc<RefCell<TestClock>>,
3373        cache: Rc<RefCell<Cache>>,
3374        trader_id: TraderId,
3375        audusd_sim: CurrencyPair,
3376    ) {
3377        pyo3::Python::initialize();
3378        Python::attach(|py| {
3379            let py_actor = create_tracking_python_actor(py).unwrap();
3380
3381            let mut rust_actor = PyDataActor::new(None);
3382            rust_actor.set_python_instance(py_actor.clone_ref(py));
3383            rust_actor.register(trader_id, clock, cache).unwrap();
3384
3385            let status = InstrumentStatus::new(
3386                audusd_sim.id,
3387                MarketStatusAction::Trading,
3388                UnixNanos::default(),
3389                UnixNanos::default(),
3390                None,
3391                None,
3392                None,
3393                None,
3394                None,
3395            );
3396
3397            let result = rust_actor.inner_mut().on_instrument_status(&status);
3398
3399            assert!(result.is_ok());
3400            assert!(python_method_was_called(
3401                &py_actor,
3402                py,
3403                "on_instrument_status"
3404            ));
3405        });
3406    }
3407
3408    #[rstest]
3409    fn test_python_dispatch_on_instrument_close(
3410        clock: Rc<RefCell<TestClock>>,
3411        cache: Rc<RefCell<Cache>>,
3412        trader_id: TraderId,
3413        audusd_sim: CurrencyPair,
3414    ) {
3415        pyo3::Python::initialize();
3416        Python::attach(|py| {
3417            let py_actor = create_tracking_python_actor(py).unwrap();
3418
3419            let mut rust_actor = PyDataActor::new(None);
3420            rust_actor.set_python_instance(py_actor.clone_ref(py));
3421            rust_actor.register(trader_id, clock, cache).unwrap();
3422
3423            let close = InstrumentClose::new(
3424                audusd_sim.id,
3425                Price::from("1.00000"),
3426                InstrumentCloseType::EndOfSession,
3427                UnixNanos::default(),
3428                UnixNanos::default(),
3429            );
3430
3431            let result = rust_actor.inner_mut().on_instrument_close(&close);
3432
3433            assert!(result.is_ok());
3434            assert!(python_method_was_called(
3435                &py_actor,
3436                py,
3437                "on_instrument_close"
3438            ));
3439        });
3440    }
3441
3442    #[rstest]
3443    fn test_python_dispatch_multiple_calls_tracked(
3444        clock: Rc<RefCell<TestClock>>,
3445        cache: Rc<RefCell<Cache>>,
3446        trader_id: TraderId,
3447        audusd_sim: CurrencyPair,
3448    ) {
3449        pyo3::Python::initialize();
3450        Python::attach(|py| {
3451            let py_actor = create_tracking_python_actor(py).unwrap();
3452
3453            let mut rust_actor = PyDataActor::new(None);
3454            rust_actor.set_python_instance(py_actor.clone_ref(py));
3455            rust_actor.register(trader_id, clock, cache).unwrap();
3456
3457            let quote = QuoteTick::new(
3458                audusd_sim.id,
3459                Price::from("1.00000"),
3460                Price::from("1.00001"),
3461                Quantity::from(100_000),
3462                Quantity::from(100_000),
3463                UnixNanos::default(),
3464                UnixNanos::default(),
3465            );
3466
3467            rust_actor.inner_mut().on_quote(&quote).unwrap();
3468            rust_actor.inner_mut().on_quote(&quote).unwrap();
3469            rust_actor.inner_mut().on_quote(&quote).unwrap();
3470
3471            assert_eq!(python_method_call_count(&py_actor, py, "on_quote"), 3);
3472        });
3473    }
3474
3475    #[rstest]
3476    fn test_python_dispatch_no_call_when_py_self_not_set(
3477        clock: Rc<RefCell<TestClock>>,
3478        cache: Rc<RefCell<Cache>>,
3479        trader_id: TraderId,
3480    ) {
3481        pyo3::Python::initialize();
3482        Python::attach(|_py| {
3483            let mut rust_actor = PyDataActor::new(None);
3484            rust_actor.register(trader_id, clock, cache).unwrap();
3485
3486            // When py_self is None, the dispatch returns Ok(()) without calling Python
3487            let result = DataActor::on_start(rust_actor.inner_mut());
3488            assert!(result.is_ok());
3489        });
3490    }
3491}