1use std::{
17 any::Any,
18 cell::{RefCell, UnsafeCell},
19 collections::HashMap,
20 fmt::Debug,
21 num::NonZeroUsize,
22 ops::{Deref, DerefMut},
23 rc::Rc,
24};
25
26use nautilus_core::{
27 from_pydict,
28 nanos::UnixNanos,
29 python::{IntoPyObjectNautilusExt, to_pyruntime_err, to_pyvalue_err},
30};
31#[cfg(feature = "defi")]
32use nautilus_model::defi::{
33 Block, Blockchain, Pool, PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolSwap,
34};
35use nautilus_model::{
36 data::{
37 Bar, BarType, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
38 MarkPriceUpdate, OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
39 },
40 enums::BookType,
41 identifiers::{ActorId, ClientId, InstrumentId, TraderId, Venue},
42 instruments::InstrumentAny,
43 orderbook::OrderBook,
44 python::instruments::instrument_any_to_pyobject,
45};
46use pyo3::{prelude::*, types::PyDict};
47
48use crate::{
49 actor::{
50 Actor, DataActor,
51 data_actor::{DataActorConfig, DataActorCore, ImportableActorConfig},
52 registry::{get_actor_registry, try_get_actor_unchecked},
53 },
54 cache::Cache,
55 clock::Clock,
56 component::{Component, get_component_registry},
57 enums::ComponentState,
58 python::{cache::PyCache, clock::PyClock, logging::PyLogger},
59 signal::Signal,
60 timer::{TimeEvent, TimeEventCallback},
61};
62
63#[pyo3::pymethods]
64impl DataActorConfig {
65 #[new]
66 #[pyo3(signature = (actor_id=None, log_events=true, log_commands=true))]
67 fn py_new(actor_id: Option<ActorId>, log_events: bool, log_commands: bool) -> Self {
68 Self {
69 actor_id,
70 log_events,
71 log_commands,
72 }
73 }
74}
75
76#[pyo3::pymethods]
77impl ImportableActorConfig {
78 #[new]
79 fn py_new(actor_path: String, config_path: String, config: Py<PyDict>) -> PyResult<Self> {
80 let json_config = Python::attach(|py| -> PyResult<HashMap<String, serde_json::Value>> {
81 let kwargs = PyDict::new(py);
82 kwargs.set_item("default", py.eval(pyo3::ffi::c_str!("str"), None, None)?)?;
83 let json_str: String = PyModule::import(py, "json")?
84 .call_method("dumps", (config.bind(py),), Some(&kwargs))?
85 .extract()?;
86
87 let json_value: serde_json::Value =
88 serde_json::from_str(&json_str).map_err(to_pyvalue_err)?;
89
90 if let serde_json::Value::Object(map) = json_value {
91 Ok(map.into_iter().collect())
92 } else {
93 Err(to_pyvalue_err("Config must be a dictionary"))
94 }
95 })?;
96
97 Ok(Self {
98 actor_path,
99 config_path,
100 config: json_config,
101 })
102 }
103
104 #[getter]
105 fn actor_path(&self) -> &String {
106 &self.actor_path
107 }
108
109 #[getter]
110 fn config_path(&self) -> &String {
111 &self.config_path
112 }
113
114 #[getter]
115 fn config(&self, py: Python<'_>) -> PyResult<Py<PyDict>> {
116 let py_dict = PyDict::new(py);
118 for (key, value) in &self.config {
119 let json_str = serde_json::to_string(value).map_err(to_pyvalue_err)?;
121 let py_value = PyModule::import(py, "json")?.call_method("loads", (json_str,), None)?;
122 py_dict.set_item(key, py_value)?;
123 }
124 Ok(py_dict.unbind())
125 }
126}
127
128pub struct PyDataActorInner {
134 core: DataActorCore,
135 py_self: Option<Py<PyAny>>,
136 clock: PyClock,
137 logger: PyLogger,
138}
139
140impl Debug for PyDataActorInner {
141 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142 f.debug_struct(stringify!(PyDataActorInner))
143 .field("core", &self.core)
144 .field("py_self", &self.py_self.as_ref().map(|_| "<Py<PyAny>>"))
145 .field("clock", &self.clock)
146 .field("logger", &self.logger)
147 .finish()
148 }
149}
150
151impl Deref for PyDataActorInner {
152 type Target = DataActorCore;
153
154 fn deref(&self) -> &Self::Target {
155 &self.core
156 }
157}
158
159impl DerefMut for PyDataActorInner {
160 fn deref_mut(&mut self) -> &mut Self::Target {
161 &mut self.core
162 }
163}
164
165impl PyDataActorInner {
166 fn dispatch_on_start(&self) -> PyResult<()> {
167 if let Some(ref py_self) = self.py_self {
168 Python::attach(|py| py_self.call_method0(py, "on_start"))?;
169 }
170 Ok(())
171 }
172
173 fn dispatch_on_stop(&mut self) -> PyResult<()> {
174 if let Some(ref py_self) = self.py_self {
175 Python::attach(|py| py_self.call_method0(py, "on_stop"))?;
176 }
177 Ok(())
178 }
179
180 fn dispatch_on_resume(&mut self) -> PyResult<()> {
181 if let Some(ref py_self) = self.py_self {
182 Python::attach(|py| py_self.call_method0(py, "on_resume"))?;
183 }
184 Ok(())
185 }
186
187 fn dispatch_on_reset(&mut self) -> PyResult<()> {
188 if let Some(ref py_self) = self.py_self {
189 Python::attach(|py| py_self.call_method0(py, "on_reset"))?;
190 }
191 Ok(())
192 }
193
194 fn dispatch_on_dispose(&mut self) -> PyResult<()> {
195 if let Some(ref py_self) = self.py_self {
196 Python::attach(|py| py_self.call_method0(py, "on_dispose"))?;
197 }
198 Ok(())
199 }
200
201 fn dispatch_on_degrade(&mut self) -> PyResult<()> {
202 if let Some(ref py_self) = self.py_self {
203 Python::attach(|py| py_self.call_method0(py, "on_degrade"))?;
204 }
205 Ok(())
206 }
207
208 fn dispatch_on_fault(&mut self) -> PyResult<()> {
209 if let Some(ref py_self) = self.py_self {
210 Python::attach(|py| py_self.call_method0(py, "on_fault"))?;
211 }
212 Ok(())
213 }
214
215 fn dispatch_on_time_event(&mut self, event: TimeEvent) -> PyResult<()> {
216 if let Some(ref py_self) = self.py_self {
217 Python::attach(|py| {
218 py_self.call_method1(py, "on_time_event", (event.into_py_any_unwrap(py),))
219 })?;
220 }
221 Ok(())
222 }
223
224 fn dispatch_on_data(&mut self, data: Py<PyAny>) -> PyResult<()> {
225 if let Some(ref py_self) = self.py_self {
226 Python::attach(|py| py_self.call_method1(py, "on_data", (data,)))?;
227 }
228 Ok(())
229 }
230
231 fn dispatch_on_signal(&mut self, signal: &Signal) -> PyResult<()> {
232 if let Some(ref py_self) = self.py_self {
233 Python::attach(|py| {
234 py_self.call_method1(py, "on_signal", (signal.clone().into_py_any_unwrap(py),))
235 })?;
236 }
237 Ok(())
238 }
239
240 fn dispatch_on_instrument(&mut self, instrument: Py<PyAny>) -> PyResult<()> {
241 if let Some(ref py_self) = self.py_self {
242 Python::attach(|py| py_self.call_method1(py, "on_instrument", (instrument,)))?;
243 }
244 Ok(())
245 }
246
247 fn dispatch_on_quote(&mut self, quote: QuoteTick) -> PyResult<()> {
248 if let Some(ref py_self) = self.py_self {
249 Python::attach(|py| {
250 py_self.call_method1(py, "on_quote", (quote.into_py_any_unwrap(py),))
251 })?;
252 }
253 Ok(())
254 }
255
256 fn dispatch_on_trade(&mut self, trade: TradeTick) -> PyResult<()> {
257 if let Some(ref py_self) = self.py_self {
258 Python::attach(|py| {
259 py_self.call_method1(py, "on_trade", (trade.into_py_any_unwrap(py),))
260 })?;
261 }
262 Ok(())
263 }
264
265 fn dispatch_on_bar(&mut self, bar: Bar) -> PyResult<()> {
266 if let Some(ref py_self) = self.py_self {
267 Python::attach(|py| py_self.call_method1(py, "on_bar", (bar.into_py_any_unwrap(py),)))?;
268 }
269 Ok(())
270 }
271
272 fn dispatch_on_book_deltas(&mut self, deltas: OrderBookDeltas) -> PyResult<()> {
273 if let Some(ref py_self) = self.py_self {
274 Python::attach(|py| {
275 py_self.call_method1(py, "on_book_deltas", (deltas.into_py_any_unwrap(py),))
276 })?;
277 }
278 Ok(())
279 }
280
281 fn dispatch_on_book(&mut self, book: &OrderBook) -> PyResult<()> {
282 if let Some(ref py_self) = self.py_self {
283 Python::attach(|py| {
284 py_self.call_method1(py, "on_book", (book.clone().into_py_any_unwrap(py),))
285 })?;
286 }
287 Ok(())
288 }
289
290 fn dispatch_on_mark_price(&mut self, mark_price: MarkPriceUpdate) -> PyResult<()> {
291 if let Some(ref py_self) = self.py_self {
292 Python::attach(|py| {
293 py_self.call_method1(py, "on_mark_price", (mark_price.into_py_any_unwrap(py),))
294 })?;
295 }
296 Ok(())
297 }
298
299 fn dispatch_on_index_price(&mut self, index_price: IndexPriceUpdate) -> PyResult<()> {
300 if let Some(ref py_self) = self.py_self {
301 Python::attach(|py| {
302 py_self.call_method1(py, "on_index_price", (index_price.into_py_any_unwrap(py),))
303 })?;
304 }
305 Ok(())
306 }
307
308 fn dispatch_on_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> PyResult<()> {
309 if let Some(ref py_self) = self.py_self {
310 Python::attach(|py| {
311 py_self.call_method1(
312 py,
313 "on_funding_rate",
314 (funding_rate.into_py_any_unwrap(py),),
315 )
316 })?;
317 }
318 Ok(())
319 }
320
321 fn dispatch_on_instrument_status(&mut self, data: InstrumentStatus) -> PyResult<()> {
322 if let Some(ref py_self) = self.py_self {
323 Python::attach(|py| {
324 py_self.call_method1(py, "on_instrument_status", (data.into_py_any_unwrap(py),))
325 })?;
326 }
327 Ok(())
328 }
329
330 fn dispatch_on_instrument_close(&mut self, update: InstrumentClose) -> PyResult<()> {
331 if let Some(ref py_self) = self.py_self {
332 Python::attach(|py| {
333 py_self.call_method1(py, "on_instrument_close", (update.into_py_any_unwrap(py),))
334 })?;
335 }
336 Ok(())
337 }
338
339 fn dispatch_on_historical_data(&mut self, data: Py<PyAny>) -> PyResult<()> {
340 if let Some(ref py_self) = self.py_self {
341 Python::attach(|py| py_self.call_method1(py, "on_historical_data", (data,)))?;
342 }
343 Ok(())
344 }
345
346 fn dispatch_on_historical_quotes(&mut self, quotes: Vec<QuoteTick>) -> PyResult<()> {
347 if let Some(ref py_self) = self.py_self {
348 Python::attach(|py| {
349 let py_quotes: Vec<_> = quotes
350 .into_iter()
351 .map(|q| q.into_py_any_unwrap(py))
352 .collect();
353 py_self.call_method1(py, "on_historical_quotes", (py_quotes,))
354 })?;
355 }
356 Ok(())
357 }
358
359 fn dispatch_on_historical_trades(&mut self, trades: Vec<TradeTick>) -> PyResult<()> {
360 if let Some(ref py_self) = self.py_self {
361 Python::attach(|py| {
362 let py_trades: Vec<_> = trades
363 .into_iter()
364 .map(|t| t.into_py_any_unwrap(py))
365 .collect();
366 py_self.call_method1(py, "on_historical_trades", (py_trades,))
367 })?;
368 }
369 Ok(())
370 }
371
372 fn dispatch_on_historical_bars(&mut self, bars: Vec<Bar>) -> PyResult<()> {
373 if let Some(ref py_self) = self.py_self {
374 Python::attach(|py| {
375 let py_bars: Vec<_> = bars.into_iter().map(|b| b.into_py_any_unwrap(py)).collect();
376 py_self.call_method1(py, "on_historical_bars", (py_bars,))
377 })?;
378 }
379 Ok(())
380 }
381
382 fn dispatch_on_historical_mark_prices(
383 &mut self,
384 mark_prices: Vec<MarkPriceUpdate>,
385 ) -> PyResult<()> {
386 if let Some(ref py_self) = self.py_self {
387 Python::attach(|py| {
388 let py_prices: Vec<_> = mark_prices
389 .into_iter()
390 .map(|p| p.into_py_any_unwrap(py))
391 .collect();
392 py_self.call_method1(py, "on_historical_mark_prices", (py_prices,))
393 })?;
394 }
395 Ok(())
396 }
397
398 fn dispatch_on_historical_index_prices(
399 &mut self,
400 index_prices: Vec<IndexPriceUpdate>,
401 ) -> PyResult<()> {
402 if let Some(ref py_self) = self.py_self {
403 Python::attach(|py| {
404 let py_prices: Vec<_> = index_prices
405 .into_iter()
406 .map(|p| p.into_py_any_unwrap(py))
407 .collect();
408 py_self.call_method1(py, "on_historical_index_prices", (py_prices,))
409 })?;
410 }
411 Ok(())
412 }
413
414 #[cfg(feature = "defi")]
415 fn dispatch_on_block(&mut self, block: Block) -> PyResult<()> {
416 if let Some(ref py_self) = self.py_self {
417 Python::attach(|py| {
418 py_self.call_method1(py, "on_block", (block.into_py_any_unwrap(py),))
419 })?;
420 }
421 Ok(())
422 }
423
424 #[cfg(feature = "defi")]
425 fn dispatch_on_pool(&mut self, pool: Pool) -> PyResult<()> {
426 if let Some(ref py_self) = self.py_self {
427 Python::attach(|py| {
428 py_self.call_method1(py, "on_pool", (pool.into_py_any_unwrap(py),))
429 })?;
430 }
431 Ok(())
432 }
433
434 #[cfg(feature = "defi")]
435 fn dispatch_on_pool_swap(&mut self, swap: PoolSwap) -> PyResult<()> {
436 if let Some(ref py_self) = self.py_self {
437 Python::attach(|py| {
438 py_self.call_method1(py, "on_pool_swap", (swap.into_py_any_unwrap(py),))
439 })?;
440 }
441 Ok(())
442 }
443
444 #[cfg(feature = "defi")]
445 fn dispatch_on_pool_liquidity_update(&mut self, update: PoolLiquidityUpdate) -> PyResult<()> {
446 if let Some(ref py_self) = self.py_self {
447 Python::attach(|py| {
448 py_self.call_method1(
449 py,
450 "on_pool_liquidity_update",
451 (update.into_py_any_unwrap(py),),
452 )
453 })?;
454 }
455 Ok(())
456 }
457
458 #[cfg(feature = "defi")]
459 fn dispatch_on_pool_fee_collect(&mut self, collect: PoolFeeCollect) -> PyResult<()> {
460 if let Some(ref py_self) = self.py_self {
461 Python::attach(|py| {
462 py_self.call_method1(py, "on_pool_fee_collect", (collect.into_py_any_unwrap(py),))
463 })?;
464 }
465 Ok(())
466 }
467
468 #[cfg(feature = "defi")]
469 fn dispatch_on_pool_flash(&mut self, flash: PoolFlash) -> PyResult<()> {
470 if let Some(ref py_self) = self.py_self {
471 Python::attach(|py| {
472 py_self.call_method1(py, "on_pool_flash", (flash.into_py_any_unwrap(py),))
473 })?;
474 }
475 Ok(())
476 }
477}
478
479fn dict_to_params(
480 py: Python<'_>,
481 params: Option<Py<PyDict>>,
482) -> PyResult<Option<nautilus_core::Params>> {
483 match params {
484 Some(dict) => from_pydict(py, dict),
485 None => Ok(None),
486 }
487}
488
489#[allow(non_camel_case_types)]
495#[pyo3::pyclass(
496 module = "nautilus_trader.common",
497 name = "DataActor",
498 unsendable,
499 subclass
500)]
501pub struct PyDataActor {
502 inner: Rc<UnsafeCell<PyDataActorInner>>,
503}
504
505impl Debug for PyDataActor {
506 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
507 f.debug_struct(stringify!(PyDataActor))
508 .field("inner", &self.inner())
509 .finish()
510 }
511}
512
513impl PyDataActor {
514 #[inline]
521 #[allow(unsafe_code)]
522 pub(crate) fn inner(&self) -> &PyDataActorInner {
523 unsafe { &*self.inner.get() }
524 }
525
526 #[inline]
533 #[allow(unsafe_code, clippy::mut_from_ref)]
534 pub(crate) fn inner_mut(&self) -> &mut PyDataActorInner {
535 unsafe { &mut *self.inner.get() }
536 }
537}
538
539impl Deref for PyDataActor {
540 type Target = DataActorCore;
541
542 fn deref(&self) -> &Self::Target {
543 &self.inner().core
544 }
545}
546
547impl DerefMut for PyDataActor {
548 fn deref_mut(&mut self) -> &mut Self::Target {
549 &mut self.inner_mut().core
550 }
551}
552
553impl PyDataActor {
554 pub fn new(config: Option<DataActorConfig>) -> Self {
556 let config = config.unwrap_or_default();
557 let core = DataActorCore::new(config);
558 let clock = PyClock::new_test(); let logger = PyLogger::new(core.actor_id().as_str());
560
561 let inner = PyDataActorInner {
562 core,
563 py_self: None,
564 clock,
565 logger,
566 };
567
568 Self {
569 inner: Rc::new(UnsafeCell::new(inner)),
570 }
571 }
572
573 pub fn set_python_instance(&mut self, py_obj: Py<PyAny>) {
580 self.inner_mut().py_self = Some(py_obj);
581 }
582
583 pub fn set_actor_id(&mut self, actor_id: ActorId) {
590 let inner = self.inner_mut();
591 inner.core.config.actor_id = Some(actor_id);
592 inner.core.actor_id = actor_id;
593 }
594
595 pub fn set_log_events(&mut self, log_events: bool) {
597 self.inner_mut().core.config.log_events = log_events;
598 }
599
600 pub fn set_log_commands(&mut self, log_commands: bool) {
602 self.inner_mut().core.config.log_commands = log_commands;
603 }
604
605 pub fn mem_address(&self) -> String {
607 self.inner().core.mem_address()
608 }
609
610 pub fn is_registered(&self) -> bool {
612 self.inner().core.is_registered()
613 }
614
615 pub fn register(
621 &mut self,
622 trader_id: TraderId,
623 clock: Rc<RefCell<dyn Clock>>,
624 cache: Rc<RefCell<Cache>>,
625 ) -> anyhow::Result<()> {
626 let inner = self.inner_mut();
627 inner.core.register(trader_id, clock, cache)?;
628
629 inner.clock = PyClock::from_rc(inner.core.clock_rc());
630
631 let actor_id = inner.actor_id().inner();
633 let callback = TimeEventCallback::from(move |event: TimeEvent| {
634 if let Some(mut actor) = try_get_actor_unchecked::<PyDataActorInner>(&actor_id) {
635 if let Err(e) = actor.on_time_event(&event) {
636 log::error!("Python time event handler failed for actor {actor_id}: {e}");
637 }
638 } else {
639 log::error!("Actor {actor_id} not found for time event handling");
640 }
641 });
642
643 inner.clock.inner_mut().register_default_handler(callback);
644
645 inner.initialize()
646 }
647
648 pub fn register_in_global_registries(&self) {
653 let inner = self.inner();
654 let component_id = inner.component_id().inner();
655 let actor_id = Actor::id(inner);
656
657 let inner_ref: Rc<UnsafeCell<PyDataActorInner>> = self.inner.clone();
658
659 let component_trait_ref: Rc<UnsafeCell<dyn Component>> = inner_ref.clone();
660 get_component_registry().insert(component_id, component_trait_ref);
661
662 let actor_trait_ref: Rc<UnsafeCell<dyn Actor>> = inner_ref;
663 get_actor_registry().insert(actor_id, actor_trait_ref);
664 }
665}
666
667impl DataActor for PyDataActorInner {
668 fn on_start(&mut self) -> anyhow::Result<()> {
669 self.dispatch_on_start()
670 .map_err(|e| anyhow::anyhow!("Python on_start failed: {e}"))
671 }
672
673 fn on_stop(&mut self) -> anyhow::Result<()> {
674 self.dispatch_on_stop()
675 .map_err(|e| anyhow::anyhow!("Python on_stop failed: {e}"))
676 }
677
678 fn on_resume(&mut self) -> anyhow::Result<()> {
679 self.dispatch_on_resume()
680 .map_err(|e| anyhow::anyhow!("Python on_resume failed: {e}"))
681 }
682
683 fn on_reset(&mut self) -> anyhow::Result<()> {
684 self.dispatch_on_reset()
685 .map_err(|e| anyhow::anyhow!("Python on_reset failed: {e}"))
686 }
687
688 fn on_dispose(&mut self) -> anyhow::Result<()> {
689 self.dispatch_on_dispose()
690 .map_err(|e| anyhow::anyhow!("Python on_dispose failed: {e}"))
691 }
692
693 fn on_degrade(&mut self) -> anyhow::Result<()> {
694 self.dispatch_on_degrade()
695 .map_err(|e| anyhow::anyhow!("Python on_degrade failed: {e}"))
696 }
697
698 fn on_fault(&mut self) -> anyhow::Result<()> {
699 self.dispatch_on_fault()
700 .map_err(|e| anyhow::anyhow!("Python on_fault failed: {e}"))
701 }
702
703 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
704 self.dispatch_on_time_event(event.clone())
705 .map_err(|e| anyhow::anyhow!("Python on_time_event failed: {e}"))
706 }
707
708 #[allow(unused_variables)]
709 fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
710 Python::attach(|py| {
711 let py_data = py.None();
714
715 self.dispatch_on_data(py_data)
716 .map_err(|e| anyhow::anyhow!("Python on_data failed: {e}"))
717 })
718 }
719
720 fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
721 self.dispatch_on_signal(signal)
722 .map_err(|e| anyhow::anyhow!("Python on_signal failed: {e}"))
723 }
724
725 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
726 Python::attach(|py| {
727 let py_instrument = instrument_any_to_pyobject(py, instrument.clone())
728 .map_err(|e| anyhow::anyhow!("Failed to convert InstrumentAny to Python: {e}"))?;
729 self.dispatch_on_instrument(py_instrument)
730 .map_err(|e| anyhow::anyhow!("Python on_instrument failed: {e}"))
731 })
732 }
733
734 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
735 self.dispatch_on_quote(*quote)
736 .map_err(|e| anyhow::anyhow!("Python on_quote failed: {e}"))
737 }
738
739 fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
740 self.dispatch_on_trade(*tick)
741 .map_err(|e| anyhow::anyhow!("Python on_trade failed: {e}"))
742 }
743
744 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
745 self.dispatch_on_bar(*bar)
746 .map_err(|e| anyhow::anyhow!("Python on_bar failed: {e}"))
747 }
748
749 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
750 self.dispatch_on_book_deltas(deltas.clone())
751 .map_err(|e| anyhow::anyhow!("Python on_book_deltas failed: {e}"))
752 }
753
754 fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
755 self.dispatch_on_book(order_book)
756 .map_err(|e| anyhow::anyhow!("Python on_book failed: {e}"))
757 }
758
759 fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
760 self.dispatch_on_mark_price(*mark_price)
761 .map_err(|e| anyhow::anyhow!("Python on_mark_price failed: {e}"))
762 }
763
764 fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
765 self.dispatch_on_index_price(*index_price)
766 .map_err(|e| anyhow::anyhow!("Python on_index_price failed: {e}"))
767 }
768
769 fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
770 self.dispatch_on_funding_rate(*funding_rate)
771 .map_err(|e| anyhow::anyhow!("Python on_funding_rate failed: {e}"))
772 }
773
774 fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
775 self.dispatch_on_instrument_status(*data)
776 .map_err(|e| anyhow::anyhow!("Python on_instrument_status failed: {e}"))
777 }
778
779 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
780 self.dispatch_on_instrument_close(*update)
781 .map_err(|e| anyhow::anyhow!("Python on_instrument_close failed: {e}"))
782 }
783
784 #[cfg(feature = "defi")]
785 fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
786 self.dispatch_on_block(block.clone())
787 .map_err(|e| anyhow::anyhow!("Python on_block failed: {e}"))
788 }
789
790 #[cfg(feature = "defi")]
791 fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
792 self.dispatch_on_pool(pool.clone())
793 .map_err(|e| anyhow::anyhow!("Python on_pool failed: {e}"))
794 }
795
796 #[cfg(feature = "defi")]
797 fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
798 self.dispatch_on_pool_swap(swap.clone())
799 .map_err(|e| anyhow::anyhow!("Python on_pool_swap failed: {e}"))
800 }
801
802 #[cfg(feature = "defi")]
803 fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
804 self.dispatch_on_pool_liquidity_update(update.clone())
805 .map_err(|e| anyhow::anyhow!("Python on_pool_liquidity_update failed: {e}"))
806 }
807
808 #[cfg(feature = "defi")]
809 fn on_pool_fee_collect(&mut self, collect: &PoolFeeCollect) -> anyhow::Result<()> {
810 self.dispatch_on_pool_fee_collect(collect.clone())
811 .map_err(|e| anyhow::anyhow!("Python on_pool_fee_collect failed: {e}"))
812 }
813
814 #[cfg(feature = "defi")]
815 fn on_pool_flash(&mut self, flash: &PoolFlash) -> anyhow::Result<()> {
816 self.dispatch_on_pool_flash(flash.clone())
817 .map_err(|e| anyhow::anyhow!("Python on_pool_flash failed: {e}"))
818 }
819
820 fn on_historical_data(&mut self, _data: &dyn Any) -> anyhow::Result<()> {
821 Python::attach(|py| {
822 let py_data = py.None();
823 self.dispatch_on_historical_data(py_data)
824 .map_err(|e| anyhow::anyhow!("Python on_historical_data failed: {e}"))
825 })
826 }
827
828 fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
829 self.dispatch_on_historical_quotes(quotes.to_vec())
830 .map_err(|e| anyhow::anyhow!("Python on_historical_quotes failed: {e}"))
831 }
832
833 fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
834 self.dispatch_on_historical_trades(trades.to_vec())
835 .map_err(|e| anyhow::anyhow!("Python on_historical_trades failed: {e}"))
836 }
837
838 fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
839 self.dispatch_on_historical_bars(bars.to_vec())
840 .map_err(|e| anyhow::anyhow!("Python on_historical_bars failed: {e}"))
841 }
842
843 fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
844 self.dispatch_on_historical_mark_prices(mark_prices.to_vec())
845 .map_err(|e| anyhow::anyhow!("Python on_historical_mark_prices failed: {e}"))
846 }
847
848 fn on_historical_index_prices(
849 &mut self,
850 index_prices: &[IndexPriceUpdate],
851 ) -> anyhow::Result<()> {
852 self.dispatch_on_historical_index_prices(index_prices.to_vec())
853 .map_err(|e| anyhow::anyhow!("Python on_historical_index_prices failed: {e}"))
854 }
855}
856
857#[pymethods]
858impl PyDataActor {
859 #[new]
860 #[pyo3(signature = (config=None))]
861 fn py_new(config: Option<DataActorConfig>) -> PyResult<Self> {
862 Ok(Self::new(config))
863 }
864
865 #[getter]
866 #[pyo3(name = "clock")]
867 fn py_clock(&self) -> PyResult<PyClock> {
868 let inner = self.inner();
869 if inner.core.is_registered() {
870 Ok(inner.clock.clone())
871 } else {
872 Err(to_pyruntime_err(
873 "Actor must be registered with a trader before accessing clock",
874 ))
875 }
876 }
877
878 #[getter]
879 #[pyo3(name = "cache")]
880 fn py_cache(&self) -> PyResult<PyCache> {
881 let inner = self.inner();
882 if inner.core.is_registered() {
883 Ok(PyCache::from_rc(inner.core.cache_rc()))
884 } else {
885 Err(to_pyruntime_err(
886 "Actor must be registered with a trader before accessing cache",
887 ))
888 }
889 }
890
891 #[getter]
892 #[pyo3(name = "log")]
893 fn py_log(&self) -> PyLogger {
894 self.inner().logger.clone()
895 }
896
897 #[getter]
898 #[pyo3(name = "actor_id")]
899 fn py_actor_id(&self) -> ActorId {
900 self.inner().core.actor_id
901 }
902
903 #[getter]
904 #[pyo3(name = "trader_id")]
905 fn py_trader_id(&self) -> Option<TraderId> {
906 self.inner().core.trader_id()
907 }
908
909 #[pyo3(name = "state")]
910 fn py_state(&self) -> ComponentState {
911 Component::state(self.inner())
912 }
913
914 #[pyo3(name = "is_ready")]
915 fn py_is_ready(&self) -> bool {
916 Component::is_ready(self.inner())
917 }
918
919 #[pyo3(name = "is_running")]
920 fn py_is_running(&self) -> bool {
921 Component::is_running(self.inner())
922 }
923
924 #[pyo3(name = "is_stopped")]
925 fn py_is_stopped(&self) -> bool {
926 Component::is_stopped(self.inner())
927 }
928
929 #[pyo3(name = "is_degraded")]
930 fn py_is_degraded(&self) -> bool {
931 Component::is_degraded(self.inner())
932 }
933
934 #[pyo3(name = "is_faulted")]
935 fn py_is_faulted(&self) -> bool {
936 Component::is_faulted(self.inner())
937 }
938
939 #[pyo3(name = "is_disposed")]
940 fn py_is_disposed(&self) -> bool {
941 Component::is_disposed(self.inner())
942 }
943
944 #[pyo3(name = "start")]
945 fn py_start(&mut self) -> PyResult<()> {
946 Component::start(self.inner_mut()).map_err(to_pyruntime_err)
947 }
948
949 #[pyo3(name = "stop")]
950 fn py_stop(&mut self) -> PyResult<()> {
951 Component::stop(self.inner_mut()).map_err(to_pyruntime_err)
952 }
953
954 #[pyo3(name = "resume")]
955 fn py_resume(&mut self) -> PyResult<()> {
956 Component::resume(self.inner_mut()).map_err(to_pyruntime_err)
957 }
958
959 #[pyo3(name = "reset")]
960 fn py_reset(&mut self) -> PyResult<()> {
961 Component::reset(self.inner_mut()).map_err(to_pyruntime_err)
962 }
963
964 #[pyo3(name = "dispose")]
965 fn py_dispose(&mut self) -> PyResult<()> {
966 Component::dispose(self.inner_mut()).map_err(to_pyruntime_err)
967 }
968
969 #[pyo3(name = "degrade")]
970 fn py_degrade(&mut self) -> PyResult<()> {
971 Component::degrade(self.inner_mut()).map_err(to_pyruntime_err)
972 }
973
974 #[pyo3(name = "fault")]
975 fn py_fault(&mut self) -> PyResult<()> {
976 Component::fault(self.inner_mut()).map_err(to_pyruntime_err)
977 }
978
979 #[pyo3(name = "shutdown_system")]
980 #[pyo3(signature = (reason=None))]
981 fn py_shutdown_system(&self, reason: Option<String>) -> PyResult<()> {
982 self.inner().core.shutdown_system(reason);
983 Ok(())
984 }
985
986 #[pyo3(name = "on_start")]
987 fn py_on_start(&self) -> PyResult<()> {
988 self.inner().dispatch_on_start()
989 }
990
991 #[pyo3(name = "on_stop")]
992 fn py_on_stop(&mut self) -> PyResult<()> {
993 self.inner_mut().dispatch_on_stop()
994 }
995
996 #[pyo3(name = "on_resume")]
997 fn py_on_resume(&mut self) -> PyResult<()> {
998 self.inner_mut().dispatch_on_resume()
999 }
1000
1001 #[pyo3(name = "on_reset")]
1002 fn py_on_reset(&mut self) -> PyResult<()> {
1003 self.inner_mut().dispatch_on_reset()
1004 }
1005
1006 #[pyo3(name = "on_dispose")]
1007 fn py_on_dispose(&mut self) -> PyResult<()> {
1008 self.inner_mut().dispatch_on_dispose()
1009 }
1010
1011 #[pyo3(name = "on_degrade")]
1012 fn py_on_degrade(&mut self) -> PyResult<()> {
1013 self.inner_mut().dispatch_on_degrade()
1014 }
1015
1016 #[pyo3(name = "on_fault")]
1017 fn py_on_fault(&mut self) -> PyResult<()> {
1018 self.inner_mut().dispatch_on_fault()
1019 }
1020
1021 #[pyo3(name = "on_time_event")]
1022 fn py_on_time_event(&mut self, event: TimeEvent) -> PyResult<()> {
1023 self.inner_mut().dispatch_on_time_event(event)
1024 }
1025
1026 #[pyo3(name = "on_data")]
1027 fn py_on_data(&mut self, data: Py<PyAny>) -> PyResult<()> {
1028 self.inner_mut().dispatch_on_data(data)
1029 }
1030
1031 #[pyo3(name = "on_signal")]
1032 fn py_on_signal(&mut self, signal: &Signal) -> PyResult<()> {
1033 self.inner_mut().dispatch_on_signal(signal)
1034 }
1035
1036 #[pyo3(name = "on_instrument")]
1037 fn py_on_instrument(&mut self, instrument: Py<PyAny>) -> PyResult<()> {
1038 self.inner_mut().dispatch_on_instrument(instrument)
1039 }
1040
1041 #[pyo3(name = "on_quote")]
1042 fn py_on_quote(&mut self, quote: QuoteTick) -> PyResult<()> {
1043 self.inner_mut().dispatch_on_quote(quote)
1044 }
1045
1046 #[pyo3(name = "on_trade")]
1047 fn py_on_trade(&mut self, trade: TradeTick) -> PyResult<()> {
1048 self.inner_mut().dispatch_on_trade(trade)
1049 }
1050
1051 #[pyo3(name = "on_bar")]
1052 fn py_on_bar(&mut self, bar: Bar) -> PyResult<()> {
1053 self.inner_mut().dispatch_on_bar(bar)
1054 }
1055
1056 #[pyo3(name = "on_book_deltas")]
1057 fn py_on_book_deltas(&mut self, deltas: OrderBookDeltas) -> PyResult<()> {
1058 self.inner_mut().dispatch_on_book_deltas(deltas)
1059 }
1060
1061 #[pyo3(name = "on_book")]
1062 fn py_on_book(&mut self, book: &OrderBook) -> PyResult<()> {
1063 self.inner_mut().dispatch_on_book(book)
1064 }
1065
1066 #[pyo3(name = "on_mark_price")]
1067 fn py_on_mark_price(&mut self, mark_price: MarkPriceUpdate) -> PyResult<()> {
1068 self.inner_mut().dispatch_on_mark_price(mark_price)
1069 }
1070
1071 #[pyo3(name = "on_index_price")]
1072 fn py_on_index_price(&mut self, index_price: IndexPriceUpdate) -> PyResult<()> {
1073 self.inner_mut().dispatch_on_index_price(index_price)
1074 }
1075
1076 #[pyo3(name = "on_funding_rate")]
1077 fn py_on_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> PyResult<()> {
1078 self.inner_mut().dispatch_on_funding_rate(funding_rate)
1079 }
1080
1081 #[pyo3(name = "on_instrument_status")]
1082 fn py_on_instrument_status(&mut self, status: InstrumentStatus) -> PyResult<()> {
1083 self.inner_mut().dispatch_on_instrument_status(status)
1084 }
1085
1086 #[pyo3(name = "on_instrument_close")]
1087 fn py_on_instrument_close(&mut self, close: InstrumentClose) -> PyResult<()> {
1088 self.inner_mut().dispatch_on_instrument_close(close)
1089 }
1090
1091 #[cfg(feature = "defi")]
1092 #[pyo3(name = "on_block")]
1093 fn py_on_block(&mut self, block: Block) -> PyResult<()> {
1094 self.inner_mut().dispatch_on_block(block)
1095 }
1096
1097 #[cfg(feature = "defi")]
1098 #[pyo3(name = "on_pool")]
1099 fn py_on_pool(&mut self, pool: Pool) -> PyResult<()> {
1100 self.inner_mut().dispatch_on_pool(pool)
1101 }
1102
1103 #[cfg(feature = "defi")]
1104 #[pyo3(name = "on_pool_swap")]
1105 fn py_on_pool_swap(&mut self, swap: PoolSwap) -> PyResult<()> {
1106 self.inner_mut().dispatch_on_pool_swap(swap)
1107 }
1108
1109 #[cfg(feature = "defi")]
1110 #[pyo3(name = "on_pool_liquidity_update")]
1111 fn py_on_pool_liquidity_update(&mut self, update: PoolLiquidityUpdate) -> PyResult<()> {
1112 self.inner_mut().dispatch_on_pool_liquidity_update(update)
1113 }
1114
1115 #[cfg(feature = "defi")]
1116 #[pyo3(name = "on_pool_fee_collect")]
1117 fn py_on_pool_fee_collect(&mut self, update: PoolFeeCollect) -> PyResult<()> {
1118 self.inner_mut().dispatch_on_pool_fee_collect(update)
1119 }
1120
1121 #[cfg(feature = "defi")]
1122 #[pyo3(name = "on_pool_flash")]
1123 fn py_on_pool_flash(&mut self, flash: PoolFlash) -> PyResult<()> {
1124 self.inner_mut().dispatch_on_pool_flash(flash)
1125 }
1126
1127 #[pyo3(name = "subscribe_data")]
1128 #[pyo3(signature = (data_type, client_id=None, params=None))]
1129 fn py_subscribe_data(
1130 &mut self,
1131 py: Python<'_>,
1132 data_type: DataType,
1133 client_id: Option<ClientId>,
1134 params: Option<Py<PyDict>>,
1135 ) -> PyResult<()> {
1136 let params = dict_to_params(py, params)?;
1137 DataActor::subscribe_data(self.inner_mut(), data_type, client_id, params);
1138 Ok(())
1139 }
1140
1141 #[pyo3(name = "subscribe_instruments")]
1142 #[pyo3(signature = (venue, client_id=None, params=None))]
1143 fn py_subscribe_instruments(
1144 &mut self,
1145 py: Python<'_>,
1146 venue: Venue,
1147 client_id: Option<ClientId>,
1148 params: Option<Py<PyDict>>,
1149 ) -> PyResult<()> {
1150 let params = dict_to_params(py, params)?;
1151 DataActor::subscribe_instruments(self.inner_mut(), venue, client_id, params);
1152 Ok(())
1153 }
1154
1155 #[pyo3(name = "subscribe_instrument")]
1156 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1157 fn py_subscribe_instrument(
1158 &mut self,
1159 py: Python<'_>,
1160 instrument_id: InstrumentId,
1161 client_id: Option<ClientId>,
1162 params: Option<Py<PyDict>>,
1163 ) -> PyResult<()> {
1164 let params = dict_to_params(py, params)?;
1165 DataActor::subscribe_instrument(self.inner_mut(), instrument_id, client_id, params);
1166 Ok(())
1167 }
1168
1169 #[pyo3(name = "subscribe_book_deltas")]
1170 #[pyo3(signature = (instrument_id, book_type, depth=None, client_id=None, managed=false, params=None))]
1171 #[allow(clippy::too_many_arguments)]
1172 fn py_subscribe_book_deltas(
1173 &mut self,
1174 py: Python<'_>,
1175 instrument_id: InstrumentId,
1176 book_type: BookType,
1177 depth: Option<usize>,
1178 client_id: Option<ClientId>,
1179 managed: bool,
1180 params: Option<Py<PyDict>>,
1181 ) -> PyResult<()> {
1182 let params = dict_to_params(py, params)?;
1183 let depth = depth.and_then(NonZeroUsize::new);
1184 DataActor::subscribe_book_deltas(
1185 self.inner_mut(),
1186 instrument_id,
1187 book_type,
1188 depth,
1189 client_id,
1190 managed,
1191 params,
1192 );
1193 Ok(())
1194 }
1195
1196 #[pyo3(name = "subscribe_book_at_interval")]
1197 #[pyo3(signature = (instrument_id, book_type, interval_ms, depth=None, client_id=None, params=None))]
1198 #[allow(clippy::too_many_arguments)]
1199 fn py_subscribe_book_at_interval(
1200 &mut self,
1201 py: Python<'_>,
1202 instrument_id: InstrumentId,
1203 book_type: BookType,
1204 interval_ms: usize,
1205 depth: Option<usize>,
1206 client_id: Option<ClientId>,
1207 params: Option<Py<PyDict>>,
1208 ) -> PyResult<()> {
1209 let params = dict_to_params(py, params)?;
1210 let depth = depth.and_then(NonZeroUsize::new);
1211 let interval_ms = NonZeroUsize::new(interval_ms)
1212 .ok_or_else(|| to_pyvalue_err("interval_ms must be > 0"))?;
1213
1214 DataActor::subscribe_book_at_interval(
1215 self.inner_mut(),
1216 instrument_id,
1217 book_type,
1218 depth,
1219 interval_ms,
1220 client_id,
1221 params,
1222 );
1223 Ok(())
1224 }
1225
1226 #[pyo3(name = "subscribe_quotes")]
1227 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1228 fn py_subscribe_quotes(
1229 &mut self,
1230 py: Python<'_>,
1231 instrument_id: InstrumentId,
1232 client_id: Option<ClientId>,
1233 params: Option<Py<PyDict>>,
1234 ) -> PyResult<()> {
1235 let params = dict_to_params(py, params)?;
1236 DataActor::subscribe_quotes(self.inner_mut(), instrument_id, client_id, params);
1237 Ok(())
1238 }
1239
1240 #[pyo3(name = "subscribe_trades")]
1241 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1242 fn py_subscribe_trades(
1243 &mut self,
1244 py: Python<'_>,
1245 instrument_id: InstrumentId,
1246 client_id: Option<ClientId>,
1247 params: Option<Py<PyDict>>,
1248 ) -> PyResult<()> {
1249 let params = dict_to_params(py, params)?;
1250 DataActor::subscribe_trades(self.inner_mut(), instrument_id, client_id, params);
1251 Ok(())
1252 }
1253
1254 #[pyo3(name = "subscribe_bars")]
1255 #[pyo3(signature = (bar_type, client_id=None, params=None))]
1256 fn py_subscribe_bars(
1257 &mut self,
1258 py: Python<'_>,
1259 bar_type: BarType,
1260 client_id: Option<ClientId>,
1261 params: Option<Py<PyDict>>,
1262 ) -> PyResult<()> {
1263 let params = dict_to_params(py, params)?;
1264 DataActor::subscribe_bars(self.inner_mut(), bar_type, client_id, params);
1265 Ok(())
1266 }
1267
1268 #[pyo3(name = "subscribe_mark_prices")]
1269 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1270 fn py_subscribe_mark_prices(
1271 &mut self,
1272 py: Python<'_>,
1273 instrument_id: InstrumentId,
1274 client_id: Option<ClientId>,
1275 params: Option<Py<PyDict>>,
1276 ) -> PyResult<()> {
1277 let params = dict_to_params(py, params)?;
1278 DataActor::subscribe_mark_prices(self.inner_mut(), instrument_id, client_id, params);
1279 Ok(())
1280 }
1281
1282 #[pyo3(name = "subscribe_index_prices")]
1283 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1284 fn py_subscribe_index_prices(
1285 &mut self,
1286 py: Python<'_>,
1287 instrument_id: InstrumentId,
1288 client_id: Option<ClientId>,
1289 params: Option<Py<PyDict>>,
1290 ) -> PyResult<()> {
1291 let params = dict_to_params(py, params)?;
1292 DataActor::subscribe_index_prices(self.inner_mut(), instrument_id, client_id, params);
1293 Ok(())
1294 }
1295
1296 #[pyo3(name = "subscribe_funding_rates")]
1297 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1298 fn py_subscribe_funding_rates(
1299 &mut self,
1300 py: Python<'_>,
1301 instrument_id: InstrumentId,
1302 client_id: Option<ClientId>,
1303 params: Option<Py<PyDict>>,
1304 ) -> PyResult<()> {
1305 let params = dict_to_params(py, params)?;
1306 DataActor::subscribe_funding_rates(self.inner_mut(), instrument_id, client_id, params);
1307 Ok(())
1308 }
1309
1310 #[pyo3(name = "subscribe_instrument_status")]
1311 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1312 fn py_subscribe_instrument_status(
1313 &mut self,
1314 py: Python<'_>,
1315 instrument_id: InstrumentId,
1316 client_id: Option<ClientId>,
1317 params: Option<Py<PyDict>>,
1318 ) -> PyResult<()> {
1319 let params = dict_to_params(py, params)?;
1320 DataActor::subscribe_instrument_status(self.inner_mut(), instrument_id, client_id, params);
1321 Ok(())
1322 }
1323
1324 #[pyo3(name = "subscribe_instrument_close")]
1325 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1326 fn py_subscribe_instrument_close(
1327 &mut self,
1328 py: Python<'_>,
1329 instrument_id: InstrumentId,
1330 client_id: Option<ClientId>,
1331 params: Option<Py<PyDict>>,
1332 ) -> PyResult<()> {
1333 let params = dict_to_params(py, params)?;
1334 DataActor::subscribe_instrument_close(self.inner_mut(), instrument_id, client_id, params);
1335 Ok(())
1336 }
1337
1338 #[pyo3(name = "subscribe_order_fills")]
1339 #[pyo3(signature = (instrument_id))]
1340 fn py_subscribe_order_fills(&mut self, instrument_id: InstrumentId) -> PyResult<()> {
1341 DataActor::subscribe_order_fills(self.inner_mut(), instrument_id);
1342 Ok(())
1343 }
1344
1345 #[pyo3(name = "subscribe_order_cancels")]
1346 #[pyo3(signature = (instrument_id))]
1347 fn py_subscribe_order_cancels(&mut self, instrument_id: InstrumentId) -> PyResult<()> {
1348 DataActor::subscribe_order_cancels(self.inner_mut(), instrument_id);
1349 Ok(())
1350 }
1351
1352 #[cfg(feature = "defi")]
1353 #[pyo3(name = "subscribe_blocks")]
1354 #[pyo3(signature = (chain, client_id=None, params=None))]
1355 fn py_subscribe_blocks(
1356 &mut self,
1357 py: Python<'_>,
1358 chain: Blockchain,
1359 client_id: Option<ClientId>,
1360 params: Option<Py<PyDict>>,
1361 ) -> PyResult<()> {
1362 let params = dict_to_params(py, params)?;
1363 DataActor::subscribe_blocks(self.inner_mut(), chain, client_id, params);
1364 Ok(())
1365 }
1366
1367 #[cfg(feature = "defi")]
1368 #[pyo3(name = "subscribe_pool")]
1369 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1370 fn py_subscribe_pool(
1371 &mut self,
1372 py: Python<'_>,
1373 instrument_id: InstrumentId,
1374 client_id: Option<ClientId>,
1375 params: Option<Py<PyDict>>,
1376 ) -> PyResult<()> {
1377 let params = dict_to_params(py, params)?;
1378 DataActor::subscribe_pool(self.inner_mut(), instrument_id, client_id, params);
1379 Ok(())
1380 }
1381
1382 #[cfg(feature = "defi")]
1383 #[pyo3(name = "subscribe_pool_swaps")]
1384 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1385 fn py_subscribe_pool_swaps(
1386 &mut self,
1387 py: Python<'_>,
1388 instrument_id: InstrumentId,
1389 client_id: Option<ClientId>,
1390 params: Option<Py<PyDict>>,
1391 ) -> PyResult<()> {
1392 let params = dict_to_params(py, params)?;
1393 DataActor::subscribe_pool_swaps(self.inner_mut(), instrument_id, client_id, params);
1394 Ok(())
1395 }
1396
1397 #[cfg(feature = "defi")]
1398 #[pyo3(name = "subscribe_pool_liquidity_updates")]
1399 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1400 fn py_subscribe_pool_liquidity_updates(
1401 &mut self,
1402 py: Python<'_>,
1403 instrument_id: InstrumentId,
1404 client_id: Option<ClientId>,
1405 params: Option<Py<PyDict>>,
1406 ) -> PyResult<()> {
1407 let params = dict_to_params(py, params)?;
1408 DataActor::subscribe_pool_liquidity_updates(
1409 self.inner_mut(),
1410 instrument_id,
1411 client_id,
1412 params,
1413 );
1414 Ok(())
1415 }
1416
1417 #[cfg(feature = "defi")]
1418 #[pyo3(name = "subscribe_pool_fee_collects")]
1419 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1420 fn py_subscribe_pool_fee_collects(
1421 &mut self,
1422 py: Python<'_>,
1423 instrument_id: InstrumentId,
1424 client_id: Option<ClientId>,
1425 params: Option<Py<PyDict>>,
1426 ) -> PyResult<()> {
1427 let params = dict_to_params(py, params)?;
1428 DataActor::subscribe_pool_fee_collects(self.inner_mut(), instrument_id, client_id, params);
1429 Ok(())
1430 }
1431
1432 #[cfg(feature = "defi")]
1433 #[pyo3(name = "subscribe_pool_flash_events")]
1434 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1435 fn py_subscribe_pool_flash_events(
1436 &mut self,
1437 py: Python<'_>,
1438 instrument_id: InstrumentId,
1439 client_id: Option<ClientId>,
1440 params: Option<Py<PyDict>>,
1441 ) -> PyResult<()> {
1442 let params = dict_to_params(py, params)?;
1443 DataActor::subscribe_pool_flash_events(self.inner_mut(), instrument_id, client_id, params);
1444 Ok(())
1445 }
1446
1447 #[pyo3(name = "request_data")]
1448 #[pyo3(signature = (data_type, client_id, start=None, end=None, limit=None, params=None))]
1449 #[allow(clippy::too_many_arguments)]
1450 fn py_request_data(
1451 &mut self,
1452 py: Python<'_>,
1453 data_type: DataType,
1454 client_id: ClientId,
1455 start: Option<u64>,
1456 end: Option<u64>,
1457 limit: Option<usize>,
1458 params: Option<Py<PyDict>>,
1459 ) -> PyResult<String> {
1460 let params = dict_to_params(py, params)?;
1461 let limit = limit.and_then(NonZeroUsize::new);
1462 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1463 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1464
1465 let request_id = DataActor::request_data(
1466 self.inner_mut(),
1467 data_type,
1468 client_id,
1469 start,
1470 end,
1471 limit,
1472 params,
1473 )
1474 .map_err(to_pyvalue_err)?;
1475 Ok(request_id.to_string())
1476 }
1477
1478 #[pyo3(name = "request_instrument")]
1479 #[pyo3(signature = (instrument_id, start=None, end=None, client_id=None, params=None))]
1480 fn py_request_instrument(
1481 &mut self,
1482 py: Python<'_>,
1483 instrument_id: InstrumentId,
1484 start: Option<u64>,
1485 end: Option<u64>,
1486 client_id: Option<ClientId>,
1487 params: Option<Py<PyDict>>,
1488 ) -> PyResult<String> {
1489 let params = dict_to_params(py, params)?;
1490 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1491 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1492
1493 let request_id = DataActor::request_instrument(
1494 self.inner_mut(),
1495 instrument_id,
1496 start,
1497 end,
1498 client_id,
1499 params,
1500 )
1501 .map_err(to_pyvalue_err)?;
1502 Ok(request_id.to_string())
1503 }
1504
1505 #[pyo3(name = "request_instruments")]
1506 #[pyo3(signature = (venue=None, start=None, end=None, client_id=None, params=None))]
1507 fn py_request_instruments(
1508 &mut self,
1509 py: Python<'_>,
1510 venue: Option<Venue>,
1511 start: Option<u64>,
1512 end: Option<u64>,
1513 client_id: Option<ClientId>,
1514 params: Option<Py<PyDict>>,
1515 ) -> PyResult<String> {
1516 let params = dict_to_params(py, params)?;
1517 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1518 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1519
1520 let request_id =
1521 DataActor::request_instruments(self.inner_mut(), venue, start, end, client_id, params)
1522 .map_err(to_pyvalue_err)?;
1523 Ok(request_id.to_string())
1524 }
1525
1526 #[pyo3(name = "request_book_snapshot")]
1527 #[pyo3(signature = (instrument_id, depth=None, client_id=None, params=None))]
1528 fn py_request_book_snapshot(
1529 &mut self,
1530 py: Python<'_>,
1531 instrument_id: InstrumentId,
1532 depth: Option<usize>,
1533 client_id: Option<ClientId>,
1534 params: Option<Py<PyDict>>,
1535 ) -> PyResult<String> {
1536 let params = dict_to_params(py, params)?;
1537 let depth = depth.and_then(NonZeroUsize::new);
1538
1539 let request_id = DataActor::request_book_snapshot(
1540 self.inner_mut(),
1541 instrument_id,
1542 depth,
1543 client_id,
1544 params,
1545 )
1546 .map_err(to_pyvalue_err)?;
1547 Ok(request_id.to_string())
1548 }
1549
1550 #[pyo3(name = "request_quotes")]
1551 #[pyo3(signature = (instrument_id, start=None, end=None, limit=None, client_id=None, params=None))]
1552 #[allow(clippy::too_many_arguments)]
1553 fn py_request_quotes(
1554 &mut self,
1555 py: Python<'_>,
1556 instrument_id: InstrumentId,
1557 start: Option<u64>,
1558 end: Option<u64>,
1559 limit: Option<usize>,
1560 client_id: Option<ClientId>,
1561 params: Option<Py<PyDict>>,
1562 ) -> PyResult<String> {
1563 let params = dict_to_params(py, params)?;
1564 let limit = limit.and_then(NonZeroUsize::new);
1565 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1566 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1567
1568 let request_id = DataActor::request_quotes(
1569 self.inner_mut(),
1570 instrument_id,
1571 start,
1572 end,
1573 limit,
1574 client_id,
1575 params,
1576 )
1577 .map_err(to_pyvalue_err)?;
1578 Ok(request_id.to_string())
1579 }
1580
1581 #[pyo3(name = "request_trades")]
1582 #[pyo3(signature = (instrument_id, start=None, end=None, limit=None, client_id=None, params=None))]
1583 #[allow(clippy::too_many_arguments)]
1584 fn py_request_trades(
1585 &mut self,
1586 py: Python<'_>,
1587 instrument_id: InstrumentId,
1588 start: Option<u64>,
1589 end: Option<u64>,
1590 limit: Option<usize>,
1591 client_id: Option<ClientId>,
1592 params: Option<Py<PyDict>>,
1593 ) -> PyResult<String> {
1594 let params = dict_to_params(py, params)?;
1595 let limit = limit.and_then(NonZeroUsize::new);
1596 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1597 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1598
1599 let request_id = DataActor::request_trades(
1600 self.inner_mut(),
1601 instrument_id,
1602 start,
1603 end,
1604 limit,
1605 client_id,
1606 params,
1607 )
1608 .map_err(to_pyvalue_err)?;
1609 Ok(request_id.to_string())
1610 }
1611
1612 #[pyo3(name = "request_bars")]
1613 #[pyo3(signature = (bar_type, start=None, end=None, limit=None, client_id=None, params=None))]
1614 #[allow(clippy::too_many_arguments)]
1615 fn py_request_bars(
1616 &mut self,
1617 py: Python<'_>,
1618 bar_type: BarType,
1619 start: Option<u64>,
1620 end: Option<u64>,
1621 limit: Option<usize>,
1622 client_id: Option<ClientId>,
1623 params: Option<Py<PyDict>>,
1624 ) -> PyResult<String> {
1625 let params = dict_to_params(py, params)?;
1626 let limit = limit.and_then(NonZeroUsize::new);
1627 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1628 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1629
1630 let request_id = DataActor::request_bars(
1631 self.inner_mut(),
1632 bar_type,
1633 start,
1634 end,
1635 limit,
1636 client_id,
1637 params,
1638 )
1639 .map_err(to_pyvalue_err)?;
1640 Ok(request_id.to_string())
1641 }
1642
1643 #[pyo3(name = "unsubscribe_data")]
1644 #[pyo3(signature = (data_type, client_id=None, params=None))]
1645 fn py_unsubscribe_data(
1646 &mut self,
1647 py: Python<'_>,
1648 data_type: DataType,
1649 client_id: Option<ClientId>,
1650 params: Option<Py<PyDict>>,
1651 ) -> PyResult<()> {
1652 let params = dict_to_params(py, params)?;
1653 DataActor::unsubscribe_data(self.inner_mut(), data_type, client_id, params);
1654 Ok(())
1655 }
1656
1657 #[pyo3(name = "unsubscribe_instruments")]
1658 #[pyo3(signature = (venue, client_id=None, params=None))]
1659 fn py_unsubscribe_instruments(
1660 &mut self,
1661 py: Python<'_>,
1662 venue: Venue,
1663 client_id: Option<ClientId>,
1664 params: Option<Py<PyDict>>,
1665 ) -> PyResult<()> {
1666 let params = dict_to_params(py, params)?;
1667 DataActor::unsubscribe_instruments(self.inner_mut(), venue, client_id, params);
1668 Ok(())
1669 }
1670
1671 #[pyo3(name = "unsubscribe_instrument")]
1672 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1673 fn py_unsubscribe_instrument(
1674 &mut self,
1675 py: Python<'_>,
1676 instrument_id: InstrumentId,
1677 client_id: Option<ClientId>,
1678 params: Option<Py<PyDict>>,
1679 ) -> PyResult<()> {
1680 let params = dict_to_params(py, params)?;
1681 DataActor::unsubscribe_instrument(self.inner_mut(), instrument_id, client_id, params);
1682 Ok(())
1683 }
1684
1685 #[pyo3(name = "unsubscribe_book_deltas")]
1686 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1687 fn py_unsubscribe_book_deltas(
1688 &mut self,
1689 py: Python<'_>,
1690 instrument_id: InstrumentId,
1691 client_id: Option<ClientId>,
1692 params: Option<Py<PyDict>>,
1693 ) -> PyResult<()> {
1694 let params = dict_to_params(py, params)?;
1695 DataActor::unsubscribe_book_deltas(self.inner_mut(), instrument_id, client_id, params);
1696 Ok(())
1697 }
1698
1699 #[pyo3(name = "unsubscribe_book_at_interval")]
1700 #[pyo3(signature = (instrument_id, interval_ms, client_id=None, params=None))]
1701 fn py_unsubscribe_book_at_interval(
1702 &mut self,
1703 py: Python<'_>,
1704 instrument_id: InstrumentId,
1705 interval_ms: usize,
1706 client_id: Option<ClientId>,
1707 params: Option<Py<PyDict>>,
1708 ) -> PyResult<()> {
1709 let params = dict_to_params(py, params)?;
1710 let interval_ms = NonZeroUsize::new(interval_ms)
1711 .ok_or_else(|| to_pyvalue_err("interval_ms must be > 0"))?;
1712
1713 DataActor::unsubscribe_book_at_interval(
1714 self.inner_mut(),
1715 instrument_id,
1716 interval_ms,
1717 client_id,
1718 params,
1719 );
1720 Ok(())
1721 }
1722
1723 #[pyo3(name = "unsubscribe_quotes")]
1724 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1725 fn py_unsubscribe_quotes(
1726 &mut self,
1727 py: Python<'_>,
1728 instrument_id: InstrumentId,
1729 client_id: Option<ClientId>,
1730 params: Option<Py<PyDict>>,
1731 ) -> PyResult<()> {
1732 let params = dict_to_params(py, params)?;
1733 DataActor::unsubscribe_quotes(self.inner_mut(), instrument_id, client_id, params);
1734 Ok(())
1735 }
1736
1737 #[pyo3(name = "unsubscribe_trades")]
1738 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1739 fn py_unsubscribe_trades(
1740 &mut self,
1741 py: Python<'_>,
1742 instrument_id: InstrumentId,
1743 client_id: Option<ClientId>,
1744 params: Option<Py<PyDict>>,
1745 ) -> PyResult<()> {
1746 let params = dict_to_params(py, params)?;
1747 DataActor::unsubscribe_trades(self.inner_mut(), instrument_id, client_id, params);
1748 Ok(())
1749 }
1750
1751 #[pyo3(name = "unsubscribe_bars")]
1752 #[pyo3(signature = (bar_type, client_id=None, params=None))]
1753 fn py_unsubscribe_bars(
1754 &mut self,
1755 py: Python<'_>,
1756 bar_type: BarType,
1757 client_id: Option<ClientId>,
1758 params: Option<Py<PyDict>>,
1759 ) -> PyResult<()> {
1760 let params = dict_to_params(py, params)?;
1761 DataActor::unsubscribe_bars(self.inner_mut(), bar_type, client_id, params);
1762 Ok(())
1763 }
1764
1765 #[pyo3(name = "unsubscribe_mark_prices")]
1766 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1767 fn py_unsubscribe_mark_prices(
1768 &mut self,
1769 py: Python<'_>,
1770 instrument_id: InstrumentId,
1771 client_id: Option<ClientId>,
1772 params: Option<Py<PyDict>>,
1773 ) -> PyResult<()> {
1774 let params = dict_to_params(py, params)?;
1775 DataActor::unsubscribe_mark_prices(self.inner_mut(), instrument_id, client_id, params);
1776 Ok(())
1777 }
1778
1779 #[pyo3(name = "unsubscribe_index_prices")]
1780 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1781 fn py_unsubscribe_index_prices(
1782 &mut self,
1783 py: Python<'_>,
1784 instrument_id: InstrumentId,
1785 client_id: Option<ClientId>,
1786 params: Option<Py<PyDict>>,
1787 ) -> PyResult<()> {
1788 let params = dict_to_params(py, params)?;
1789 DataActor::unsubscribe_index_prices(self.inner_mut(), instrument_id, client_id, params);
1790 Ok(())
1791 }
1792
1793 #[pyo3(name = "unsubscribe_instrument_status")]
1794 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1795 fn py_unsubscribe_instrument_status(
1796 &mut self,
1797 py: Python<'_>,
1798 instrument_id: InstrumentId,
1799 client_id: Option<ClientId>,
1800 params: Option<Py<PyDict>>,
1801 ) -> PyResult<()> {
1802 let params = dict_to_params(py, params)?;
1803 DataActor::unsubscribe_instrument_status(
1804 self.inner_mut(),
1805 instrument_id,
1806 client_id,
1807 params,
1808 );
1809 Ok(())
1810 }
1811
1812 #[pyo3(name = "unsubscribe_instrument_close")]
1813 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1814 fn py_unsubscribe_instrument_close(
1815 &mut self,
1816 py: Python<'_>,
1817 instrument_id: InstrumentId,
1818 client_id: Option<ClientId>,
1819 params: Option<Py<PyDict>>,
1820 ) -> PyResult<()> {
1821 let params = dict_to_params(py, params)?;
1822 DataActor::unsubscribe_instrument_close(self.inner_mut(), instrument_id, client_id, params);
1823 Ok(())
1824 }
1825
1826 #[pyo3(name = "unsubscribe_order_fills")]
1827 #[pyo3(signature = (instrument_id))]
1828 fn py_unsubscribe_order_fills(&mut self, instrument_id: InstrumentId) -> PyResult<()> {
1829 DataActor::unsubscribe_order_fills(self.inner_mut(), instrument_id);
1830 Ok(())
1831 }
1832
1833 #[pyo3(name = "unsubscribe_order_cancels")]
1834 #[pyo3(signature = (instrument_id))]
1835 fn py_unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId) -> PyResult<()> {
1836 DataActor::unsubscribe_order_cancels(self.inner_mut(), instrument_id);
1837 Ok(())
1838 }
1839
1840 #[cfg(feature = "defi")]
1841 #[pyo3(name = "unsubscribe_blocks")]
1842 #[pyo3(signature = (chain, client_id=None, params=None))]
1843 fn py_unsubscribe_blocks(
1844 &mut self,
1845 py: Python<'_>,
1846 chain: Blockchain,
1847 client_id: Option<ClientId>,
1848 params: Option<Py<PyDict>>,
1849 ) -> PyResult<()> {
1850 let params = dict_to_params(py, params)?;
1851 DataActor::unsubscribe_blocks(self.inner_mut(), chain, client_id, params);
1852 Ok(())
1853 }
1854
1855 #[cfg(feature = "defi")]
1856 #[pyo3(name = "unsubscribe_pool")]
1857 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1858 fn py_unsubscribe_pool(
1859 &mut self,
1860 py: Python<'_>,
1861 instrument_id: InstrumentId,
1862 client_id: Option<ClientId>,
1863 params: Option<Py<PyDict>>,
1864 ) -> PyResult<()> {
1865 let params = dict_to_params(py, params)?;
1866 DataActor::unsubscribe_pool(self.inner_mut(), instrument_id, client_id, params);
1867 Ok(())
1868 }
1869
1870 #[cfg(feature = "defi")]
1871 #[pyo3(name = "unsubscribe_pool_swaps")]
1872 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1873 fn py_unsubscribe_pool_swaps(
1874 &mut self,
1875 py: Python<'_>,
1876 instrument_id: InstrumentId,
1877 client_id: Option<ClientId>,
1878 params: Option<Py<PyDict>>,
1879 ) -> PyResult<()> {
1880 let params = dict_to_params(py, params)?;
1881 DataActor::unsubscribe_pool_swaps(self.inner_mut(), instrument_id, client_id, params);
1882 Ok(())
1883 }
1884
1885 #[cfg(feature = "defi")]
1886 #[pyo3(name = "unsubscribe_pool_liquidity_updates")]
1887 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1888 fn py_unsubscribe_pool_liquidity_updates(
1889 &mut self,
1890 py: Python<'_>,
1891 instrument_id: InstrumentId,
1892 client_id: Option<ClientId>,
1893 params: Option<Py<PyDict>>,
1894 ) -> PyResult<()> {
1895 let params = dict_to_params(py, params)?;
1896 DataActor::unsubscribe_pool_liquidity_updates(
1897 self.inner_mut(),
1898 instrument_id,
1899 client_id,
1900 params,
1901 );
1902 Ok(())
1903 }
1904
1905 #[cfg(feature = "defi")]
1906 #[pyo3(name = "unsubscribe_pool_fee_collects")]
1907 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1908 fn py_unsubscribe_pool_fee_collects(
1909 &mut self,
1910 py: Python<'_>,
1911 instrument_id: InstrumentId,
1912 client_id: Option<ClientId>,
1913 params: Option<Py<PyDict>>,
1914 ) -> PyResult<()> {
1915 let params = dict_to_params(py, params)?;
1916 DataActor::unsubscribe_pool_fee_collects(
1917 self.inner_mut(),
1918 instrument_id,
1919 client_id,
1920 params,
1921 );
1922 Ok(())
1923 }
1924
1925 #[cfg(feature = "defi")]
1926 #[pyo3(name = "unsubscribe_pool_flash_events")]
1927 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1928 fn py_unsubscribe_pool_flash_events(
1929 &mut self,
1930 py: Python<'_>,
1931 instrument_id: InstrumentId,
1932 client_id: Option<ClientId>,
1933 params: Option<Py<PyDict>>,
1934 ) -> PyResult<()> {
1935 let params = dict_to_params(py, params)?;
1936 DataActor::unsubscribe_pool_flash_events(
1937 self.inner_mut(),
1938 instrument_id,
1939 client_id,
1940 params,
1941 );
1942 Ok(())
1943 }
1944
1945 #[allow(unused_variables)]
1946 #[pyo3(name = "on_historical_data")]
1947 fn py_on_historical_data(&mut self, data: Py<PyAny>) -> PyResult<()> {
1948 Ok(())
1950 }
1951
1952 #[allow(unused_variables)]
1953 #[pyo3(name = "on_historical_quotes")]
1954 fn py_on_historical_quotes(&mut self, quotes: Vec<QuoteTick>) -> PyResult<()> {
1955 Ok(())
1957 }
1958
1959 #[allow(unused_variables)]
1960 #[pyo3(name = "on_historical_trades")]
1961 fn py_on_historical_trades(&mut self, trades: Vec<TradeTick>) -> PyResult<()> {
1962 Ok(())
1964 }
1965
1966 #[allow(unused_variables)]
1967 #[pyo3(name = "on_historical_bars")]
1968 fn py_on_historical_bars(&mut self, bars: Vec<Bar>) -> PyResult<()> {
1969 Ok(())
1971 }
1972
1973 #[allow(unused_variables)]
1974 #[pyo3(name = "on_historical_mark_prices")]
1975 fn py_on_historical_mark_prices(&mut self, mark_prices: Vec<MarkPriceUpdate>) -> PyResult<()> {
1976 Ok(())
1978 }
1979
1980 #[allow(unused_variables)]
1981 #[pyo3(name = "on_historical_index_prices")]
1982 fn py_on_historical_index_prices(
1983 &mut self,
1984 index_prices: Vec<IndexPriceUpdate>,
1985 ) -> PyResult<()> {
1986 Ok(())
1988 }
1989}
1990
1991#[cfg(test)]
1992mod tests {
1993 use std::{
1994 any::Any,
1995 cell::RefCell,
1996 collections::HashMap,
1997 ops::{Deref, DerefMut},
1998 rc::Rc,
1999 str::FromStr,
2000 sync::{Arc, Mutex},
2001 };
2002
2003 #[cfg(feature = "defi")]
2004 use alloy_primitives::{I256, U160};
2005 use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
2006 #[cfg(feature = "defi")]
2007 use nautilus_model::defi::{
2008 AmmType, Block, Blockchain, Chain, Dex, DexType, Pool, PoolIdentifier, PoolLiquidityUpdate,
2009 PoolSwap, Token,
2010 };
2011 use nautilus_model::{
2012 data::{
2013 Bar, BarType, DataType, IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate,
2014 OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
2015 },
2016 enums::{AggressorSide, BookType, InstrumentCloseType, MarketStatusAction},
2017 identifiers::{ClientId, TradeId, TraderId, Venue},
2018 instruments::{CurrencyPair, InstrumentAny, stubs::audusd_sim},
2019 orderbook::OrderBook,
2020 types::{Price, Quantity},
2021 };
2022 use pyo3::{Py, PyAny, PyResult, Python, ffi::c_str, types::PyAnyMethods};
2023 use rstest::{fixture, rstest};
2024 use ustr::Ustr;
2025
2026 use super::PyDataActor;
2027 use crate::{
2028 actor::{DataActor, data_actor::DataActorCore},
2029 cache::Cache,
2030 clock::TestClock,
2031 component::Component,
2032 enums::ComponentState,
2033 runner::{SyncDataCommandSender, set_data_cmd_sender},
2034 signal::Signal,
2035 timer::TimeEvent,
2036 };
2037
2038 #[fixture]
2039 fn clock() -> Rc<RefCell<TestClock>> {
2040 Rc::new(RefCell::new(TestClock::new()))
2041 }
2042
2043 #[fixture]
2044 fn cache() -> Rc<RefCell<Cache>> {
2045 Rc::new(RefCell::new(Cache::new(None, None)))
2046 }
2047
2048 #[fixture]
2049 fn trader_id() -> TraderId {
2050 TraderId::from("TRADER-001")
2051 }
2052
2053 #[fixture]
2054 fn client_id() -> ClientId {
2055 ClientId::new("TestClient")
2056 }
2057
2058 #[fixture]
2059 fn venue() -> Venue {
2060 Venue::from("SIM")
2061 }
2062
2063 #[fixture]
2064 fn data_type() -> DataType {
2065 DataType::new("TestData", None)
2066 }
2067
2068 #[fixture]
2069 fn bar_type(audusd_sim: CurrencyPair) -> BarType {
2070 BarType::from_str(&format!("{}-1-MINUTE-LAST-INTERNAL", audusd_sim.id)).unwrap()
2071 }
2072
2073 fn create_unregistered_actor() -> PyDataActor {
2074 PyDataActor::new(None)
2075 }
2076
2077 fn create_registered_actor(
2078 clock: Rc<RefCell<TestClock>>,
2079 cache: Rc<RefCell<Cache>>,
2080 trader_id: TraderId,
2081 ) -> PyDataActor {
2082 let sender = SyncDataCommandSender;
2084 set_data_cmd_sender(Arc::new(sender));
2085
2086 let mut actor = PyDataActor::new(None);
2087 actor.register(trader_id, clock, cache).unwrap();
2088 actor
2089 }
2090
2091 #[rstest]
2092 fn test_new_actor_creation() {
2093 let actor = PyDataActor::new(None);
2094 assert!(actor.trader_id().is_none());
2095 }
2096
2097 #[rstest]
2098 fn test_clock_access_before_registration_raises_error() {
2099 let actor = PyDataActor::new(None);
2100
2101 let result = actor.py_clock();
2103 assert!(result.is_err());
2104
2105 let error = result.unwrap_err();
2106 pyo3::Python::initialize();
2107 pyo3::Python::attach(|py| {
2108 assert!(error.is_instance_of::<pyo3::exceptions::PyRuntimeError>(py));
2109 });
2110
2111 let error_msg = error.to_string();
2112 assert!(
2113 error_msg.contains("Actor must be registered with a trader before accessing clock")
2114 );
2115 }
2116
2117 #[rstest]
2118 fn test_unregistered_actor_methods_work() {
2119 let actor = create_unregistered_actor();
2120
2121 assert!(!actor.py_is_ready());
2122 assert!(!actor.py_is_running());
2123 assert!(!actor.py_is_stopped());
2124 assert!(!actor.py_is_disposed());
2125 assert!(!actor.py_is_degraded());
2126 assert!(!actor.py_is_faulted());
2127
2128 assert_eq!(actor.trader_id(), None);
2130 }
2131
2132 #[rstest]
2133 fn test_registration_success(
2134 clock: Rc<RefCell<TestClock>>,
2135 cache: Rc<RefCell<Cache>>,
2136 trader_id: TraderId,
2137 ) {
2138 let mut actor = create_unregistered_actor();
2139 actor.register(trader_id, clock, cache).unwrap();
2140 assert!(actor.trader_id().is_some());
2141 assert_eq!(actor.trader_id().unwrap(), trader_id);
2142 }
2143
2144 #[rstest]
2145 fn test_registered_actor_basic_properties(
2146 clock: Rc<RefCell<TestClock>>,
2147 cache: Rc<RefCell<Cache>>,
2148 trader_id: TraderId,
2149 ) {
2150 let actor = create_registered_actor(clock, cache, trader_id);
2151
2152 assert_eq!(actor.state(), ComponentState::Ready);
2153 assert_eq!(actor.trader_id(), Some(TraderId::from("TRADER-001")));
2154 assert!(actor.py_is_ready());
2155 assert!(!actor.py_is_running());
2156 assert!(!actor.py_is_stopped());
2157 assert!(!actor.py_is_disposed());
2158 assert!(!actor.py_is_degraded());
2159 assert!(!actor.py_is_faulted());
2160 }
2161
2162 #[rstest]
2163 fn test_basic_subscription_methods_compile(
2164 clock: Rc<RefCell<TestClock>>,
2165 cache: Rc<RefCell<Cache>>,
2166 trader_id: TraderId,
2167 data_type: DataType,
2168 client_id: ClientId,
2169 audusd_sim: CurrencyPair,
2170 ) {
2171 let mut actor = create_registered_actor(clock, cache, trader_id);
2172
2173 pyo3::Python::initialize();
2174 pyo3::Python::attach(|py| {
2175 assert!(
2176 actor
2177 .py_subscribe_data(py, data_type.clone(), Some(client_id), None)
2178 .is_ok()
2179 );
2180 assert!(
2181 actor
2182 .py_subscribe_quotes(py, audusd_sim.id, Some(client_id), None)
2183 .is_ok()
2184 );
2185 assert!(
2186 actor
2187 .py_unsubscribe_data(py, data_type, Some(client_id), None)
2188 .is_ok()
2189 );
2190 assert!(
2191 actor
2192 .py_unsubscribe_quotes(py, audusd_sim.id, Some(client_id), None)
2193 .is_ok()
2194 );
2195 });
2196 }
2197
2198 #[rstest]
2199 fn test_shutdown_system_passes_through(
2200 clock: Rc<RefCell<TestClock>>,
2201 cache: Rc<RefCell<Cache>>,
2202 trader_id: TraderId,
2203 ) {
2204 let actor = create_registered_actor(clock, cache, trader_id);
2205
2206 assert!(
2207 actor
2208 .py_shutdown_system(Some("Test shutdown".to_string()))
2209 .is_ok()
2210 );
2211 assert!(actor.py_shutdown_system(None).is_ok());
2212 }
2213
2214 #[rstest]
2215 fn test_book_at_interval_invalid_interval_ms(
2216 clock: Rc<RefCell<TestClock>>,
2217 cache: Rc<RefCell<Cache>>,
2218 trader_id: TraderId,
2219 audusd_sim: CurrencyPair,
2220 ) {
2221 pyo3::Python::initialize();
2222 let mut actor = create_registered_actor(clock, cache, trader_id);
2223
2224 pyo3::Python::attach(|py| {
2225 let result = actor.py_subscribe_book_at_interval(
2226 py,
2227 audusd_sim.id,
2228 BookType::L2_MBP,
2229 0,
2230 None,
2231 None,
2232 None,
2233 );
2234 assert!(result.is_err());
2235 assert_eq!(
2236 result.unwrap_err().to_string(),
2237 "ValueError: interval_ms must be > 0"
2238 );
2239
2240 let result = actor.py_unsubscribe_book_at_interval(py, audusd_sim.id, 0, None, None);
2241 assert!(result.is_err());
2242 assert_eq!(
2243 result.unwrap_err().to_string(),
2244 "ValueError: interval_ms must be > 0"
2245 );
2246 });
2247 }
2248
2249 #[rstest]
2250 fn test_request_methods_signatures_exist() {
2251 let actor = create_unregistered_actor();
2252 assert!(actor.trader_id().is_none());
2253 }
2254
2255 #[rstest]
2256 fn test_data_actor_trait_implementation(
2257 clock: Rc<RefCell<TestClock>>,
2258 cache: Rc<RefCell<Cache>>,
2259 trader_id: TraderId,
2260 ) {
2261 let actor = create_registered_actor(clock, cache, trader_id);
2262 let state = actor.state();
2263 assert_eq!(state, ComponentState::Ready);
2264 }
2265
2266 static CALL_TRACKER: std::sync::LazyLock<Arc<Mutex<HashMap<String, i32>>>> =
2267 std::sync::LazyLock::new(|| Arc::new(Mutex::new(HashMap::new())));
2268
2269 #[derive(Debug)]
2270 struct TestDataActor {
2271 inner: PyDataActor,
2272 }
2273
2274 impl TestDataActor {
2275 fn new() -> Self {
2276 Self {
2277 inner: PyDataActor::new(None),
2278 }
2279 }
2280
2281 fn track_call(&self, handler_name: &str) {
2282 let mut tracker = CALL_TRACKER.lock().expect(MUTEX_POISONED);
2283 *tracker.entry(handler_name.to_string()).or_insert(0) += 1;
2284 }
2285
2286 fn get_call_count(&self, handler_name: &str) -> i32 {
2287 let tracker = CALL_TRACKER.lock().expect(MUTEX_POISONED);
2288 tracker.get(handler_name).copied().unwrap_or(0)
2289 }
2290
2291 fn reset_tracker(&self) {
2292 let mut tracker = CALL_TRACKER.lock().expect(MUTEX_POISONED);
2293 tracker.clear();
2294 }
2295 }
2296
2297 impl Deref for TestDataActor {
2298 type Target = DataActorCore;
2299 fn deref(&self) -> &Self::Target {
2300 &self.inner.inner().core
2301 }
2302 }
2303
2304 impl DerefMut for TestDataActor {
2305 fn deref_mut(&mut self) -> &mut Self::Target {
2306 &mut self.inner.inner_mut().core
2307 }
2308 }
2309
2310 impl DataActor for TestDataActor {
2311 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
2312 self.track_call("on_time_event");
2313 self.inner.inner_mut().on_time_event(event)
2314 }
2315
2316 fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
2317 self.track_call("on_data");
2318 self.inner.inner_mut().on_data(data)
2319 }
2320
2321 fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
2322 self.track_call("on_signal");
2323 self.inner.inner_mut().on_signal(signal)
2324 }
2325
2326 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
2327 self.track_call("on_instrument");
2328 self.inner.inner_mut().on_instrument(instrument)
2329 }
2330
2331 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
2332 self.track_call("on_quote");
2333 self.inner.inner_mut().on_quote(quote)
2334 }
2335
2336 fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
2337 self.track_call("on_trade");
2338 self.inner.inner_mut().on_trade(trade)
2339 }
2340
2341 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
2342 self.track_call("on_bar");
2343 self.inner.inner_mut().on_bar(bar)
2344 }
2345
2346 fn on_book(&mut self, book: &OrderBook) -> anyhow::Result<()> {
2347 self.track_call("on_book");
2348 self.inner.inner_mut().on_book(book)
2349 }
2350
2351 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
2352 self.track_call("on_book_deltas");
2353 self.inner.inner_mut().on_book_deltas(deltas)
2354 }
2355
2356 fn on_mark_price(&mut self, update: &MarkPriceUpdate) -> anyhow::Result<()> {
2357 self.track_call("on_mark_price");
2358 self.inner.inner_mut().on_mark_price(update)
2359 }
2360
2361 fn on_index_price(&mut self, update: &IndexPriceUpdate) -> anyhow::Result<()> {
2362 self.track_call("on_index_price");
2363 self.inner.inner_mut().on_index_price(update)
2364 }
2365
2366 fn on_instrument_status(&mut self, update: &InstrumentStatus) -> anyhow::Result<()> {
2367 self.track_call("on_instrument_status");
2368 self.inner.inner_mut().on_instrument_status(update)
2369 }
2370
2371 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
2372 self.track_call("on_instrument_close");
2373 self.inner.inner_mut().on_instrument_close(update)
2374 }
2375
2376 #[cfg(feature = "defi")]
2377 fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
2378 self.track_call("on_block");
2379 self.inner.inner_mut().on_block(block)
2380 }
2381
2382 #[cfg(feature = "defi")]
2383 fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
2384 self.track_call("on_pool");
2385 self.inner.inner_mut().on_pool(pool)
2386 }
2387
2388 #[cfg(feature = "defi")]
2389 fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
2390 self.track_call("on_pool_swap");
2391 self.inner.inner_mut().on_pool_swap(swap)
2392 }
2393
2394 #[cfg(feature = "defi")]
2395 fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
2396 self.track_call("on_pool_liquidity_update");
2397 self.inner.inner_mut().on_pool_liquidity_update(update)
2398 }
2399 }
2400
2401 #[rstest]
2402 fn test_python_on_signal_handler(
2403 clock: Rc<RefCell<TestClock>>,
2404 cache: Rc<RefCell<Cache>>,
2405 trader_id: TraderId,
2406 ) {
2407 pyo3::Python::initialize();
2408 let mut test_actor = TestDataActor::new();
2409 test_actor.reset_tracker();
2410 test_actor.register(trader_id, clock, cache).unwrap();
2411
2412 let signal = Signal::new(
2413 Ustr::from("test_signal"),
2414 "1.0".to_string(),
2415 UnixNanos::default(),
2416 UnixNanos::default(),
2417 );
2418
2419 assert!(test_actor.on_signal(&signal).is_ok());
2420 assert_eq!(test_actor.get_call_count("on_signal"), 1);
2421 }
2422
2423 #[rstest]
2424 fn test_python_on_data_handler(
2425 clock: Rc<RefCell<TestClock>>,
2426 cache: Rc<RefCell<Cache>>,
2427 trader_id: TraderId,
2428 ) {
2429 pyo3::Python::initialize();
2430 let mut test_actor = TestDataActor::new();
2431 test_actor.reset_tracker();
2432 test_actor.register(trader_id, clock, cache).unwrap();
2433
2434 assert!(test_actor.on_data(&()).is_ok());
2435 assert_eq!(test_actor.get_call_count("on_data"), 1);
2436 }
2437
2438 #[rstest]
2439 fn test_python_on_time_event_handler(
2440 clock: Rc<RefCell<TestClock>>,
2441 cache: Rc<RefCell<Cache>>,
2442 trader_id: TraderId,
2443 ) {
2444 pyo3::Python::initialize();
2445 let mut test_actor = TestDataActor::new();
2446 test_actor.reset_tracker();
2447 test_actor.register(trader_id, clock, cache).unwrap();
2448
2449 let time_event = TimeEvent::new(
2450 Ustr::from("test_timer"),
2451 UUID4::new(),
2452 UnixNanos::default(),
2453 UnixNanos::default(),
2454 );
2455
2456 assert!(test_actor.on_time_event(&time_event).is_ok());
2457 assert_eq!(test_actor.get_call_count("on_time_event"), 1);
2458 }
2459
2460 #[rstest]
2461 fn test_python_on_instrument_handler(
2462 clock: Rc<RefCell<TestClock>>,
2463 cache: Rc<RefCell<Cache>>,
2464 trader_id: TraderId,
2465 audusd_sim: CurrencyPair,
2466 ) {
2467 pyo3::Python::initialize();
2468 let mut rust_actor = PyDataActor::new(None);
2469 rust_actor.register(trader_id, clock, cache).unwrap();
2470
2471 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2472
2473 assert!(rust_actor.inner_mut().on_instrument(&instrument).is_ok());
2474 }
2475
2476 #[rstest]
2477 fn test_python_on_quote_handler(
2478 clock: Rc<RefCell<TestClock>>,
2479 cache: Rc<RefCell<Cache>>,
2480 trader_id: TraderId,
2481 audusd_sim: CurrencyPair,
2482 ) {
2483 pyo3::Python::initialize();
2484 let mut rust_actor = PyDataActor::new(None);
2485 rust_actor.register(trader_id, clock, cache).unwrap();
2486
2487 let quote = QuoteTick::new(
2488 audusd_sim.id,
2489 Price::from("1.0000"),
2490 Price::from("1.0001"),
2491 Quantity::from("100000"),
2492 Quantity::from("100000"),
2493 UnixNanos::default(),
2494 UnixNanos::default(),
2495 );
2496
2497 assert!(rust_actor.inner_mut().on_quote("e).is_ok());
2498 }
2499
2500 #[rstest]
2501 fn test_python_on_trade_handler(
2502 clock: Rc<RefCell<TestClock>>,
2503 cache: Rc<RefCell<Cache>>,
2504 trader_id: TraderId,
2505 audusd_sim: CurrencyPair,
2506 ) {
2507 pyo3::Python::initialize();
2508 let mut rust_actor = PyDataActor::new(None);
2509 rust_actor.register(trader_id, clock, cache).unwrap();
2510
2511 let trade = TradeTick::new(
2512 audusd_sim.id,
2513 Price::from("1.0000"),
2514 Quantity::from("100000"),
2515 AggressorSide::Buyer,
2516 "T123".to_string().into(),
2517 UnixNanos::default(),
2518 UnixNanos::default(),
2519 );
2520
2521 assert!(rust_actor.inner_mut().on_trade(&trade).is_ok());
2522 }
2523
2524 #[rstest]
2525 fn test_python_on_bar_handler(
2526 clock: Rc<RefCell<TestClock>>,
2527 cache: Rc<RefCell<Cache>>,
2528 trader_id: TraderId,
2529 audusd_sim: CurrencyPair,
2530 ) {
2531 pyo3::Python::initialize();
2532 let mut rust_actor = PyDataActor::new(None);
2533 rust_actor.register(trader_id, clock, cache).unwrap();
2534
2535 let bar_type =
2536 BarType::from_str(&format!("{}-1-MINUTE-LAST-INTERNAL", audusd_sim.id)).unwrap();
2537 let bar = Bar::new(
2538 bar_type,
2539 Price::from("1.0000"),
2540 Price::from("1.0001"),
2541 Price::from("0.9999"),
2542 Price::from("1.0000"),
2543 Quantity::from("100000"),
2544 UnixNanos::default(),
2545 UnixNanos::default(),
2546 );
2547
2548 assert!(rust_actor.inner_mut().on_bar(&bar).is_ok());
2549 }
2550
2551 #[rstest]
2552 fn test_python_on_book_handler(
2553 clock: Rc<RefCell<TestClock>>,
2554 cache: Rc<RefCell<Cache>>,
2555 trader_id: TraderId,
2556 audusd_sim: CurrencyPair,
2557 ) {
2558 pyo3::Python::initialize();
2559 let mut rust_actor = PyDataActor::new(None);
2560 rust_actor.register(trader_id, clock, cache).unwrap();
2561
2562 let book = OrderBook::new(audusd_sim.id, BookType::L2_MBP);
2563 assert!(rust_actor.inner_mut().on_book(&book).is_ok());
2564 }
2565
2566 #[rstest]
2567 fn test_python_on_book_deltas_handler(
2568 clock: Rc<RefCell<TestClock>>,
2569 cache: Rc<RefCell<Cache>>,
2570 trader_id: TraderId,
2571 audusd_sim: CurrencyPair,
2572 ) {
2573 pyo3::Python::initialize();
2574 let mut rust_actor = PyDataActor::new(None);
2575 rust_actor.register(trader_id, clock, cache).unwrap();
2576
2577 let delta =
2578 OrderBookDelta::clear(audusd_sim.id, 0, UnixNanos::default(), UnixNanos::default());
2579 let deltas = OrderBookDeltas::new(audusd_sim.id, vec![delta]);
2580
2581 assert!(rust_actor.inner_mut().on_book_deltas(&deltas).is_ok());
2582 }
2583
2584 #[rstest]
2585 fn test_python_on_mark_price_handler(
2586 clock: Rc<RefCell<TestClock>>,
2587 cache: Rc<RefCell<Cache>>,
2588 trader_id: TraderId,
2589 audusd_sim: CurrencyPair,
2590 ) {
2591 pyo3::Python::initialize();
2592 let mut rust_actor = PyDataActor::new(None);
2593 rust_actor.register(trader_id, clock, cache).unwrap();
2594
2595 let mark_price = MarkPriceUpdate::new(
2596 audusd_sim.id,
2597 Price::from("1.0000"),
2598 UnixNanos::default(),
2599 UnixNanos::default(),
2600 );
2601
2602 assert!(rust_actor.inner_mut().on_mark_price(&mark_price).is_ok());
2603 }
2604
2605 #[rstest]
2606 fn test_python_on_index_price_handler(
2607 clock: Rc<RefCell<TestClock>>,
2608 cache: Rc<RefCell<Cache>>,
2609 trader_id: TraderId,
2610 audusd_sim: CurrencyPair,
2611 ) {
2612 pyo3::Python::initialize();
2613 let mut rust_actor = PyDataActor::new(None);
2614 rust_actor.register(trader_id, clock, cache).unwrap();
2615
2616 let index_price = IndexPriceUpdate::new(
2617 audusd_sim.id,
2618 Price::from("1.0000"),
2619 UnixNanos::default(),
2620 UnixNanos::default(),
2621 );
2622
2623 assert!(rust_actor.inner_mut().on_index_price(&index_price).is_ok());
2624 }
2625
2626 #[rstest]
2627 fn test_python_on_instrument_status_handler(
2628 clock: Rc<RefCell<TestClock>>,
2629 cache: Rc<RefCell<Cache>>,
2630 trader_id: TraderId,
2631 audusd_sim: CurrencyPair,
2632 ) {
2633 pyo3::Python::initialize();
2634 let mut rust_actor = PyDataActor::new(None);
2635 rust_actor.register(trader_id, clock, cache).unwrap();
2636
2637 let status = InstrumentStatus::new(
2638 audusd_sim.id,
2639 MarketStatusAction::Trading,
2640 UnixNanos::default(),
2641 UnixNanos::default(),
2642 None,
2643 None,
2644 None,
2645 None,
2646 None,
2647 );
2648
2649 assert!(rust_actor.inner_mut().on_instrument_status(&status).is_ok());
2650 }
2651
2652 #[rstest]
2653 fn test_python_on_instrument_close_handler(
2654 clock: Rc<RefCell<TestClock>>,
2655 cache: Rc<RefCell<Cache>>,
2656 trader_id: TraderId,
2657 audusd_sim: CurrencyPair,
2658 ) {
2659 pyo3::Python::initialize();
2660 let mut rust_actor = PyDataActor::new(None);
2661 rust_actor.register(trader_id, clock, cache).unwrap();
2662
2663 let close = InstrumentClose::new(
2664 audusd_sim.id,
2665 Price::from("1.0000"),
2666 InstrumentCloseType::EndOfSession,
2667 UnixNanos::default(),
2668 UnixNanos::default(),
2669 );
2670
2671 assert!(rust_actor.inner_mut().on_instrument_close(&close).is_ok());
2672 }
2673
2674 #[cfg(feature = "defi")]
2675 #[rstest]
2676 fn test_python_on_block_handler(
2677 clock: Rc<RefCell<TestClock>>,
2678 cache: Rc<RefCell<Cache>>,
2679 trader_id: TraderId,
2680 ) {
2681 pyo3::Python::initialize();
2682 let mut test_actor = TestDataActor::new();
2683 test_actor.reset_tracker();
2684 test_actor.register(trader_id, clock, cache).unwrap();
2685
2686 let block = Block::new(
2687 "0x1234567890abcdef".to_string(),
2688 "0xabcdef1234567890".to_string(),
2689 12345,
2690 "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0".into(),
2691 21000,
2692 20000,
2693 UnixNanos::default(),
2694 Some(Blockchain::Ethereum),
2695 );
2696
2697 assert!(test_actor.on_block(&block).is_ok());
2698 assert_eq!(test_actor.get_call_count("on_block"), 1);
2699 }
2700
2701 #[cfg(feature = "defi")]
2702 #[rstest]
2703 fn test_python_on_pool_swap_handler(
2704 clock: Rc<RefCell<TestClock>>,
2705 cache: Rc<RefCell<Cache>>,
2706 trader_id: TraderId,
2707 ) {
2708 pyo3::Python::initialize();
2709 let mut rust_actor = PyDataActor::new(None);
2710 rust_actor.register(trader_id, clock, cache).unwrap();
2711
2712 let chain = Arc::new(Chain::new(Blockchain::Ethereum, 1));
2713 let dex = Arc::new(Dex::new(
2714 Chain::new(Blockchain::Ethereum, 1),
2715 DexType::UniswapV3,
2716 "0x1F98431c8aD98523631AE4a59f267346ea31F984",
2717 0,
2718 AmmType::CLAMM,
2719 "PoolCreated",
2720 "Swap",
2721 "Mint",
2722 "Burn",
2723 "Collect",
2724 ));
2725 let token0 = Token::new(
2726 chain.clone(),
2727 "0xa0b86a33e6441c8c06dd7b111a8c4e82e2b2a5e1"
2728 .parse()
2729 .unwrap(),
2730 "USDC".into(),
2731 "USD Coin".into(),
2732 6,
2733 );
2734 let token1 = Token::new(
2735 chain.clone(),
2736 "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"
2737 .parse()
2738 .unwrap(),
2739 "WETH".into(),
2740 "Wrapped Ether".into(),
2741 18,
2742 );
2743 let pool_address = "0x8ad599c3A0ff1De082011EFDDc58f1908eb6e6D8"
2744 .parse()
2745 .unwrap();
2746 let pool_identifier: PoolIdentifier = "0x8ad599c3A0ff1De082011EFDDc58f1908eb6e6D8"
2747 .parse()
2748 .unwrap();
2749 let pool = Arc::new(Pool::new(
2750 chain.clone(),
2751 dex.clone(),
2752 pool_address,
2753 pool_identifier,
2754 12345,
2755 token0,
2756 token1,
2757 Some(500),
2758 Some(10),
2759 UnixNanos::default(),
2760 ));
2761
2762 let swap = PoolSwap::new(
2763 chain,
2764 dex,
2765 pool.instrument_id,
2766 pool.pool_identifier,
2767 12345,
2768 "0xabc123".to_string(),
2769 0,
2770 0,
2771 None,
2772 "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
2773 .parse()
2774 .unwrap(),
2775 "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
2776 .parse()
2777 .unwrap(),
2778 I256::from_str("1000000000000000000").unwrap(),
2779 I256::from_str("400000000000000").unwrap(),
2780 U160::from(59000000000000u128),
2781 1000000,
2782 100,
2783 );
2784
2785 assert!(rust_actor.inner_mut().on_pool_swap(&swap).is_ok());
2786 }
2787
2788 #[cfg(feature = "defi")]
2789 #[rstest]
2790 fn test_python_on_pool_liquidity_update_handler(
2791 clock: Rc<RefCell<TestClock>>,
2792 cache: Rc<RefCell<Cache>>,
2793 trader_id: TraderId,
2794 ) {
2795 pyo3::Python::initialize();
2796 let mut rust_actor = PyDataActor::new(None);
2797 rust_actor.register(trader_id, clock, cache).unwrap();
2798
2799 let block = Block::new(
2800 "0x1234567890abcdef".to_string(),
2801 "0xabcdef1234567890".to_string(),
2802 12345,
2803 "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0".into(),
2804 21000,
2805 20000,
2806 UnixNanos::default(),
2807 Some(Blockchain::Ethereum),
2808 );
2809
2810 assert!(rust_actor.inner_mut().on_block(&block).is_ok());
2812 }
2813
2814 const TRACKING_ACTOR_CODE: &std::ffi::CStr = c_str!(
2815 r#"
2816class TrackingActor:
2817 """A mock Python actor that tracks all method calls."""
2818
2819 def __init__(self):
2820 self.calls = []
2821
2822 def _record(self, method_name, *args):
2823 self.calls.append((method_name, args))
2824
2825 def was_called(self, method_name):
2826 return any(call[0] == method_name for call in self.calls)
2827
2828 def call_count(self, method_name):
2829 return sum(1 for call in self.calls if call[0] == method_name)
2830
2831 def on_start(self):
2832 self._record("on_start")
2833
2834 def on_stop(self):
2835 self._record("on_stop")
2836
2837 def on_resume(self):
2838 self._record("on_resume")
2839
2840 def on_reset(self):
2841 self._record("on_reset")
2842
2843 def on_dispose(self):
2844 self._record("on_dispose")
2845
2846 def on_degrade(self):
2847 self._record("on_degrade")
2848
2849 def on_fault(self):
2850 self._record("on_fault")
2851
2852 def on_time_event(self, event):
2853 self._record("on_time_event", event)
2854
2855 def on_data(self, data):
2856 self._record("on_data", data)
2857
2858 def on_signal(self, signal):
2859 self._record("on_signal", signal)
2860
2861 def on_instrument(self, instrument):
2862 self._record("on_instrument", instrument)
2863
2864 def on_quote(self, quote):
2865 self._record("on_quote", quote)
2866
2867 def on_trade(self, trade):
2868 self._record("on_trade", trade)
2869
2870 def on_bar(self, bar):
2871 self._record("on_bar", bar)
2872
2873 def on_book(self, book):
2874 self._record("on_book", book)
2875
2876 def on_book_deltas(self, deltas):
2877 self._record("on_book_deltas", deltas)
2878
2879 def on_mark_price(self, update):
2880 self._record("on_mark_price", update)
2881
2882 def on_index_price(self, update):
2883 self._record("on_index_price", update)
2884
2885 def on_funding_rate(self, update):
2886 self._record("on_funding_rate", update)
2887
2888 def on_instrument_status(self, status):
2889 self._record("on_instrument_status", status)
2890
2891 def on_instrument_close(self, close):
2892 self._record("on_instrument_close", close)
2893
2894 def on_historical_data(self, data):
2895 self._record("on_historical_data", data)
2896
2897 def on_historical_quotes(self, quotes):
2898 self._record("on_historical_quotes", quotes)
2899
2900 def on_historical_trades(self, trades):
2901 self._record("on_historical_trades", trades)
2902
2903 def on_historical_bars(self, bars):
2904 self._record("on_historical_bars", bars)
2905
2906 def on_historical_mark_prices(self, prices):
2907 self._record("on_historical_mark_prices", prices)
2908
2909 def on_historical_index_prices(self, prices):
2910 self._record("on_historical_index_prices", prices)
2911"#
2912 );
2913
2914 fn create_tracking_python_actor(py: Python<'_>) -> PyResult<Py<PyAny>> {
2915 py.run(TRACKING_ACTOR_CODE, None, None)?;
2916 let tracking_actor_class = py.eval(c_str!("TrackingActor"), None, None)?;
2917 let instance = tracking_actor_class.call0()?;
2918 Ok(instance.unbind())
2919 }
2920
2921 fn python_method_was_called(py_actor: &Py<PyAny>, py: Python<'_>, method_name: &str) -> bool {
2922 py_actor
2923 .call_method1(py, "was_called", (method_name,))
2924 .and_then(|r| r.extract::<bool>(py))
2925 .unwrap_or(false)
2926 }
2927
2928 fn python_method_call_count(py_actor: &Py<PyAny>, py: Python<'_>, method_name: &str) -> i32 {
2929 py_actor
2930 .call_method1(py, "call_count", (method_name,))
2931 .and_then(|r| r.extract::<i32>(py))
2932 .unwrap_or(0)
2933 }
2934
2935 #[rstest]
2936 fn test_python_dispatch_on_start(
2937 clock: Rc<RefCell<TestClock>>,
2938 cache: Rc<RefCell<Cache>>,
2939 trader_id: TraderId,
2940 ) {
2941 pyo3::Python::initialize();
2942 Python::attach(|py| {
2943 let py_actor = create_tracking_python_actor(py).unwrap();
2944
2945 let mut rust_actor = PyDataActor::new(None);
2946 rust_actor.set_python_instance(py_actor.clone_ref(py));
2947 rust_actor.register(trader_id, clock, cache).unwrap();
2948
2949 let result = DataActor::on_start(rust_actor.inner_mut());
2950
2951 assert!(result.is_ok());
2952 assert!(python_method_was_called(&py_actor, py, "on_start"));
2953 assert_eq!(python_method_call_count(&py_actor, py, "on_start"), 1);
2954 });
2955 }
2956
2957 #[rstest]
2958 fn test_python_dispatch_on_stop(
2959 clock: Rc<RefCell<TestClock>>,
2960 cache: Rc<RefCell<Cache>>,
2961 trader_id: TraderId,
2962 ) {
2963 pyo3::Python::initialize();
2964 Python::attach(|py| {
2965 let py_actor = create_tracking_python_actor(py).unwrap();
2966
2967 let mut rust_actor = PyDataActor::new(None);
2968 rust_actor.set_python_instance(py_actor.clone_ref(py));
2969 rust_actor.register(trader_id, clock, cache).unwrap();
2970
2971 let result = DataActor::on_stop(rust_actor.inner_mut());
2972
2973 assert!(result.is_ok());
2974 assert!(python_method_was_called(&py_actor, py, "on_stop"));
2975 });
2976 }
2977
2978 #[rstest]
2979 fn test_python_dispatch_on_resume(
2980 clock: Rc<RefCell<TestClock>>,
2981 cache: Rc<RefCell<Cache>>,
2982 trader_id: TraderId,
2983 ) {
2984 pyo3::Python::initialize();
2985 Python::attach(|py| {
2986 let py_actor = create_tracking_python_actor(py).unwrap();
2987
2988 let mut rust_actor = PyDataActor::new(None);
2989 rust_actor.set_python_instance(py_actor.clone_ref(py));
2990 rust_actor.register(trader_id, clock, cache).unwrap();
2991
2992 let result = DataActor::on_resume(rust_actor.inner_mut());
2993
2994 assert!(result.is_ok());
2995 assert!(python_method_was_called(&py_actor, py, "on_resume"));
2996 });
2997 }
2998
2999 #[rstest]
3000 fn test_python_dispatch_on_reset(
3001 clock: Rc<RefCell<TestClock>>,
3002 cache: Rc<RefCell<Cache>>,
3003 trader_id: TraderId,
3004 ) {
3005 pyo3::Python::initialize();
3006 Python::attach(|py| {
3007 let py_actor = create_tracking_python_actor(py).unwrap();
3008
3009 let mut rust_actor = PyDataActor::new(None);
3010 rust_actor.set_python_instance(py_actor.clone_ref(py));
3011 rust_actor.register(trader_id, clock, cache).unwrap();
3012
3013 let result = DataActor::on_reset(rust_actor.inner_mut());
3014
3015 assert!(result.is_ok());
3016 assert!(python_method_was_called(&py_actor, py, "on_reset"));
3017 });
3018 }
3019
3020 #[rstest]
3021 fn test_python_dispatch_on_dispose(
3022 clock: Rc<RefCell<TestClock>>,
3023 cache: Rc<RefCell<Cache>>,
3024 trader_id: TraderId,
3025 ) {
3026 pyo3::Python::initialize();
3027 Python::attach(|py| {
3028 let py_actor = create_tracking_python_actor(py).unwrap();
3029
3030 let mut rust_actor = PyDataActor::new(None);
3031 rust_actor.set_python_instance(py_actor.clone_ref(py));
3032 rust_actor.register(trader_id, clock, cache).unwrap();
3033
3034 let result = DataActor::on_dispose(rust_actor.inner_mut());
3035
3036 assert!(result.is_ok());
3037 assert!(python_method_was_called(&py_actor, py, "on_dispose"));
3038 });
3039 }
3040
3041 #[rstest]
3042 fn test_python_dispatch_on_degrade(
3043 clock: Rc<RefCell<TestClock>>,
3044 cache: Rc<RefCell<Cache>>,
3045 trader_id: TraderId,
3046 ) {
3047 pyo3::Python::initialize();
3048 Python::attach(|py| {
3049 let py_actor = create_tracking_python_actor(py).unwrap();
3050
3051 let mut rust_actor = PyDataActor::new(None);
3052 rust_actor.set_python_instance(py_actor.clone_ref(py));
3053 rust_actor.register(trader_id, clock, cache).unwrap();
3054
3055 let result = DataActor::on_degrade(rust_actor.inner_mut());
3056
3057 assert!(result.is_ok());
3058 assert!(python_method_was_called(&py_actor, py, "on_degrade"));
3059 });
3060 }
3061
3062 #[rstest]
3063 fn test_python_dispatch_on_fault(
3064 clock: Rc<RefCell<TestClock>>,
3065 cache: Rc<RefCell<Cache>>,
3066 trader_id: TraderId,
3067 ) {
3068 pyo3::Python::initialize();
3069 Python::attach(|py| {
3070 let py_actor = create_tracking_python_actor(py).unwrap();
3071
3072 let mut rust_actor = PyDataActor::new(None);
3073 rust_actor.set_python_instance(py_actor.clone_ref(py));
3074 rust_actor.register(trader_id, clock, cache).unwrap();
3075
3076 let result = DataActor::on_fault(rust_actor.inner_mut());
3077
3078 assert!(result.is_ok());
3079 assert!(python_method_was_called(&py_actor, py, "on_fault"));
3080 });
3081 }
3082
3083 #[rstest]
3084 fn test_python_dispatch_on_signal(
3085 clock: Rc<RefCell<TestClock>>,
3086 cache: Rc<RefCell<Cache>>,
3087 trader_id: TraderId,
3088 ) {
3089 pyo3::Python::initialize();
3090 Python::attach(|py| {
3091 let py_actor = create_tracking_python_actor(py).unwrap();
3092
3093 let mut rust_actor = PyDataActor::new(None);
3094 rust_actor.set_python_instance(py_actor.clone_ref(py));
3095 rust_actor.register(trader_id, clock, cache).unwrap();
3096
3097 let signal = Signal::new(
3098 Ustr::from("test_signal"),
3099 "1.0".to_string(),
3100 UnixNanos::default(),
3101 UnixNanos::default(),
3102 );
3103
3104 let result = rust_actor.inner_mut().on_signal(&signal);
3105
3106 assert!(result.is_ok());
3107 assert!(python_method_was_called(&py_actor, py, "on_signal"));
3108 });
3109 }
3110
3111 #[rstest]
3112 fn test_python_dispatch_on_time_event(
3113 clock: Rc<RefCell<TestClock>>,
3114 cache: Rc<RefCell<Cache>>,
3115 trader_id: TraderId,
3116 ) {
3117 pyo3::Python::initialize();
3118 Python::attach(|py| {
3119 let py_actor = create_tracking_python_actor(py).unwrap();
3120
3121 let mut rust_actor = PyDataActor::new(None);
3122 rust_actor.set_python_instance(py_actor.clone_ref(py));
3123 rust_actor.register(trader_id, clock, cache).unwrap();
3124
3125 let time_event = TimeEvent::new(
3126 Ustr::from("test_timer"),
3127 UUID4::new(),
3128 UnixNanos::default(),
3129 UnixNanos::default(),
3130 );
3131
3132 let result = rust_actor.inner_mut().on_time_event(&time_event);
3133
3134 assert!(result.is_ok());
3135 assert!(python_method_was_called(&py_actor, py, "on_time_event"));
3136 });
3137 }
3138
3139 #[rstest]
3140 fn test_python_dispatch_on_instrument(
3141 clock: Rc<RefCell<TestClock>>,
3142 cache: Rc<RefCell<Cache>>,
3143 trader_id: TraderId,
3144 audusd_sim: CurrencyPair,
3145 ) {
3146 pyo3::Python::initialize();
3147 Python::attach(|py| {
3148 let py_actor = create_tracking_python_actor(py).unwrap();
3149
3150 let mut rust_actor = PyDataActor::new(None);
3151 rust_actor.set_python_instance(py_actor.clone_ref(py));
3152 rust_actor.register(trader_id, clock, cache).unwrap();
3153
3154 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3155
3156 let result = rust_actor.inner_mut().on_instrument(&instrument);
3157
3158 assert!(result.is_ok());
3159 assert!(python_method_was_called(&py_actor, py, "on_instrument"));
3160 });
3161 }
3162
3163 #[rstest]
3164 fn test_python_dispatch_on_quote(
3165 clock: Rc<RefCell<TestClock>>,
3166 cache: Rc<RefCell<Cache>>,
3167 trader_id: TraderId,
3168 audusd_sim: CurrencyPair,
3169 ) {
3170 pyo3::Python::initialize();
3171 Python::attach(|py| {
3172 let py_actor = create_tracking_python_actor(py).unwrap();
3173
3174 let mut rust_actor = PyDataActor::new(None);
3175 rust_actor.set_python_instance(py_actor.clone_ref(py));
3176 rust_actor.register(trader_id, clock, cache).unwrap();
3177
3178 let quote = QuoteTick::new(
3179 audusd_sim.id,
3180 Price::from("1.00000"),
3181 Price::from("1.00001"),
3182 Quantity::from(100_000),
3183 Quantity::from(100_000),
3184 UnixNanos::default(),
3185 UnixNanos::default(),
3186 );
3187
3188 let result = rust_actor.inner_mut().on_quote("e);
3189
3190 assert!(result.is_ok());
3191 assert!(python_method_was_called(&py_actor, py, "on_quote"));
3192 });
3193 }
3194
3195 #[rstest]
3196 fn test_python_dispatch_on_trade(
3197 clock: Rc<RefCell<TestClock>>,
3198 cache: Rc<RefCell<Cache>>,
3199 trader_id: TraderId,
3200 audusd_sim: CurrencyPair,
3201 ) {
3202 pyo3::Python::initialize();
3203 Python::attach(|py| {
3204 let py_actor = create_tracking_python_actor(py).unwrap();
3205
3206 let mut rust_actor = PyDataActor::new(None);
3207 rust_actor.set_python_instance(py_actor.clone_ref(py));
3208 rust_actor.register(trader_id, clock, cache).unwrap();
3209
3210 let trade = TradeTick::new(
3211 audusd_sim.id,
3212 Price::from("1.00000"),
3213 Quantity::from(100_000),
3214 AggressorSide::Buyer,
3215 TradeId::new("123456"),
3216 UnixNanos::default(),
3217 UnixNanos::default(),
3218 );
3219
3220 let result = rust_actor.inner_mut().on_trade(&trade);
3221
3222 assert!(result.is_ok());
3223 assert!(python_method_was_called(&py_actor, py, "on_trade"));
3224 });
3225 }
3226
3227 #[rstest]
3228 fn test_python_dispatch_on_bar(
3229 clock: Rc<RefCell<TestClock>>,
3230 cache: Rc<RefCell<Cache>>,
3231 trader_id: TraderId,
3232 audusd_sim: CurrencyPair,
3233 ) {
3234 pyo3::Python::initialize();
3235 Python::attach(|py| {
3236 let py_actor = create_tracking_python_actor(py).unwrap();
3237
3238 let mut rust_actor = PyDataActor::new(None);
3239 rust_actor.set_python_instance(py_actor.clone_ref(py));
3240 rust_actor.register(trader_id, clock, cache).unwrap();
3241
3242 let bar_type =
3243 BarType::from_str(&format!("{}-1-MINUTE-LAST-INTERNAL", audusd_sim.id)).unwrap();
3244 let bar = Bar::new(
3245 bar_type,
3246 Price::from("1.00000"),
3247 Price::from("1.00010"),
3248 Price::from("0.99990"),
3249 Price::from("1.00005"),
3250 Quantity::from(100_000),
3251 UnixNanos::default(),
3252 UnixNanos::default(),
3253 );
3254
3255 let result = rust_actor.inner_mut().on_bar(&bar);
3256
3257 assert!(result.is_ok());
3258 assert!(python_method_was_called(&py_actor, py, "on_bar"));
3259 });
3260 }
3261
3262 #[rstest]
3263 fn test_python_dispatch_on_book(
3264 clock: Rc<RefCell<TestClock>>,
3265 cache: Rc<RefCell<Cache>>,
3266 trader_id: TraderId,
3267 audusd_sim: CurrencyPair,
3268 ) {
3269 pyo3::Python::initialize();
3270 Python::attach(|py| {
3271 let py_actor = create_tracking_python_actor(py).unwrap();
3272
3273 let mut rust_actor = PyDataActor::new(None);
3274 rust_actor.set_python_instance(py_actor.clone_ref(py));
3275 rust_actor.register(trader_id, clock, cache).unwrap();
3276
3277 let book = OrderBook::new(audusd_sim.id, BookType::L2_MBP);
3278
3279 let result = rust_actor.inner_mut().on_book(&book);
3280
3281 assert!(result.is_ok());
3282 assert!(python_method_was_called(&py_actor, py, "on_book"));
3283 });
3284 }
3285
3286 #[rstest]
3287 fn test_python_dispatch_on_book_deltas(
3288 clock: Rc<RefCell<TestClock>>,
3289 cache: Rc<RefCell<Cache>>,
3290 trader_id: TraderId,
3291 audusd_sim: CurrencyPair,
3292 ) {
3293 pyo3::Python::initialize();
3294 Python::attach(|py| {
3295 let py_actor = create_tracking_python_actor(py).unwrap();
3296
3297 let mut rust_actor = PyDataActor::new(None);
3298 rust_actor.set_python_instance(py_actor.clone_ref(py));
3299 rust_actor.register(trader_id, clock, cache).unwrap();
3300
3301 let delta =
3302 OrderBookDelta::clear(audusd_sim.id, 0, UnixNanos::default(), UnixNanos::default());
3303 let deltas = OrderBookDeltas::new(audusd_sim.id, vec![delta]);
3304
3305 let result = rust_actor.inner_mut().on_book_deltas(&deltas);
3306
3307 assert!(result.is_ok());
3308 assert!(python_method_was_called(&py_actor, py, "on_book_deltas"));
3309 });
3310 }
3311
3312 #[rstest]
3313 fn test_python_dispatch_on_mark_price(
3314 clock: Rc<RefCell<TestClock>>,
3315 cache: Rc<RefCell<Cache>>,
3316 trader_id: TraderId,
3317 audusd_sim: CurrencyPair,
3318 ) {
3319 pyo3::Python::initialize();
3320 Python::attach(|py| {
3321 let py_actor = create_tracking_python_actor(py).unwrap();
3322
3323 let mut rust_actor = PyDataActor::new(None);
3324 rust_actor.set_python_instance(py_actor.clone_ref(py));
3325 rust_actor.register(trader_id, clock, cache).unwrap();
3326
3327 let mark_price = MarkPriceUpdate::new(
3328 audusd_sim.id,
3329 Price::from("1.00000"),
3330 UnixNanos::default(),
3331 UnixNanos::default(),
3332 );
3333
3334 let result = rust_actor.inner_mut().on_mark_price(&mark_price);
3335
3336 assert!(result.is_ok());
3337 assert!(python_method_was_called(&py_actor, py, "on_mark_price"));
3338 });
3339 }
3340
3341 #[rstest]
3342 fn test_python_dispatch_on_index_price(
3343 clock: Rc<RefCell<TestClock>>,
3344 cache: Rc<RefCell<Cache>>,
3345 trader_id: TraderId,
3346 audusd_sim: CurrencyPair,
3347 ) {
3348 pyo3::Python::initialize();
3349 Python::attach(|py| {
3350 let py_actor = create_tracking_python_actor(py).unwrap();
3351
3352 let mut rust_actor = PyDataActor::new(None);
3353 rust_actor.set_python_instance(py_actor.clone_ref(py));
3354 rust_actor.register(trader_id, clock, cache).unwrap();
3355
3356 let index_price = IndexPriceUpdate::new(
3357 audusd_sim.id,
3358 Price::from("1.00000"),
3359 UnixNanos::default(),
3360 UnixNanos::default(),
3361 );
3362
3363 let result = rust_actor.inner_mut().on_index_price(&index_price);
3364
3365 assert!(result.is_ok());
3366 assert!(python_method_was_called(&py_actor, py, "on_index_price"));
3367 });
3368 }
3369
3370 #[rstest]
3371 fn test_python_dispatch_on_instrument_status(
3372 clock: Rc<RefCell<TestClock>>,
3373 cache: Rc<RefCell<Cache>>,
3374 trader_id: TraderId,
3375 audusd_sim: CurrencyPair,
3376 ) {
3377 pyo3::Python::initialize();
3378 Python::attach(|py| {
3379 let py_actor = create_tracking_python_actor(py).unwrap();
3380
3381 let mut rust_actor = PyDataActor::new(None);
3382 rust_actor.set_python_instance(py_actor.clone_ref(py));
3383 rust_actor.register(trader_id, clock, cache).unwrap();
3384
3385 let status = InstrumentStatus::new(
3386 audusd_sim.id,
3387 MarketStatusAction::Trading,
3388 UnixNanos::default(),
3389 UnixNanos::default(),
3390 None,
3391 None,
3392 None,
3393 None,
3394 None,
3395 );
3396
3397 let result = rust_actor.inner_mut().on_instrument_status(&status);
3398
3399 assert!(result.is_ok());
3400 assert!(python_method_was_called(
3401 &py_actor,
3402 py,
3403 "on_instrument_status"
3404 ));
3405 });
3406 }
3407
3408 #[rstest]
3409 fn test_python_dispatch_on_instrument_close(
3410 clock: Rc<RefCell<TestClock>>,
3411 cache: Rc<RefCell<Cache>>,
3412 trader_id: TraderId,
3413 audusd_sim: CurrencyPair,
3414 ) {
3415 pyo3::Python::initialize();
3416 Python::attach(|py| {
3417 let py_actor = create_tracking_python_actor(py).unwrap();
3418
3419 let mut rust_actor = PyDataActor::new(None);
3420 rust_actor.set_python_instance(py_actor.clone_ref(py));
3421 rust_actor.register(trader_id, clock, cache).unwrap();
3422
3423 let close = InstrumentClose::new(
3424 audusd_sim.id,
3425 Price::from("1.00000"),
3426 InstrumentCloseType::EndOfSession,
3427 UnixNanos::default(),
3428 UnixNanos::default(),
3429 );
3430
3431 let result = rust_actor.inner_mut().on_instrument_close(&close);
3432
3433 assert!(result.is_ok());
3434 assert!(python_method_was_called(
3435 &py_actor,
3436 py,
3437 "on_instrument_close"
3438 ));
3439 });
3440 }
3441
3442 #[rstest]
3443 fn test_python_dispatch_multiple_calls_tracked(
3444 clock: Rc<RefCell<TestClock>>,
3445 cache: Rc<RefCell<Cache>>,
3446 trader_id: TraderId,
3447 audusd_sim: CurrencyPair,
3448 ) {
3449 pyo3::Python::initialize();
3450 Python::attach(|py| {
3451 let py_actor = create_tracking_python_actor(py).unwrap();
3452
3453 let mut rust_actor = PyDataActor::new(None);
3454 rust_actor.set_python_instance(py_actor.clone_ref(py));
3455 rust_actor.register(trader_id, clock, cache).unwrap();
3456
3457 let quote = QuoteTick::new(
3458 audusd_sim.id,
3459 Price::from("1.00000"),
3460 Price::from("1.00001"),
3461 Quantity::from(100_000),
3462 Quantity::from(100_000),
3463 UnixNanos::default(),
3464 UnixNanos::default(),
3465 );
3466
3467 rust_actor.inner_mut().on_quote("e).unwrap();
3468 rust_actor.inner_mut().on_quote("e).unwrap();
3469 rust_actor.inner_mut().on_quote("e).unwrap();
3470
3471 assert_eq!(python_method_call_count(&py_actor, py, "on_quote"), 3);
3472 });
3473 }
3474
3475 #[rstest]
3476 fn test_python_dispatch_no_call_when_py_self_not_set(
3477 clock: Rc<RefCell<TestClock>>,
3478 cache: Rc<RefCell<Cache>>,
3479 trader_id: TraderId,
3480 ) {
3481 pyo3::Python::initialize();
3482 Python::attach(|_py| {
3483 let mut rust_actor = PyDataActor::new(None);
3484 rust_actor.register(trader_id, clock, cache).unwrap();
3485
3486 let result = DataActor::on_start(rust_actor.inner_mut());
3488 assert!(result.is_ok());
3489 });
3490 }
3491}