use std::sync::Arc;
use nautilus_infrastructure::sql::pg::PostgresConnectOptions;
use nautilus_model::defi::{Chain, DexType};
use pyo3::prelude::*;
use crate::config::{BlockchainDataClientConfig, DexPoolFilters};
#[pymethods]
#[pyo3_stub_gen::derive::gen_stub_pymethods(module = "nautilus_trader.adapters.blockchain")]
impl DexPoolFilters {
#[new]
#[must_use]
pub fn py_new(remove_pools_with_empty_erc20_fields: Option<bool>) -> Self {
Self::builder()
.maybe_remove_pools_with_empty_erc20fields(remove_pools_with_empty_erc20_fields)
.build()
}
}
#[pymethods]
#[pyo3_stub_gen::derive::gen_stub_pymethods(module = "nautilus_trader.adapters.blockchain")]
impl BlockchainDataClientConfig {
#[new]
#[expect(clippy::too_many_arguments)]
#[pyo3(signature = (chain, dex_ids, http_rpc_url, rpc_requests_per_second=None, multicall_calls_per_rpc_request=None, wss_rpc_url=None, use_hypersync_for_live_data=true, from_block=None, pool_filters=None, postgres_cache_database_config=None, proxy_url=None))]
fn py_new(
#[gen_stub(
override_type(
type_repr = "nautilus_trader.model.Chain",
imports = ("nautilus_trader.model",),
),
)]
chain: &Chain,
#[gen_stub(
override_type(
type_repr = "typing.Sequence[nautilus_trader.model.DexType]",
imports = ("typing", "nautilus_trader.model"),
),
)]
dex_ids: Vec<DexType>,
http_rpc_url: String,
rpc_requests_per_second: Option<u32>,
multicall_calls_per_rpc_request: Option<u32>,
wss_rpc_url: Option<String>,
use_hypersync_for_live_data: bool,
from_block: Option<u64>,
pool_filters: Option<DexPoolFilters>,
#[gen_stub(
override_type(
type_repr = "typing.Optional[nautilus_trader.infrastructure.PostgresConnectOptions]",
imports = ("typing", "nautilus_trader.infrastructure"),
),
)]
postgres_cache_database_config: Option<PostgresConnectOptions>,
proxy_url: Option<String>,
) -> Self {
Self::builder()
.chain(Arc::new(chain.clone()))
.dex_ids(dex_ids)
.http_rpc_url(http_rpc_url)
.maybe_rpc_requests_per_second(rpc_requests_per_second)
.maybe_multicall_calls_per_rpc_request(multicall_calls_per_rpc_request)
.maybe_wss_rpc_url(wss_rpc_url)
.use_hypersync_for_live_data(use_hypersync_for_live_data)
.maybe_from_block(from_block)
.maybe_pool_filters(pool_filters)
.maybe_postgres_cache_database_config(postgres_cache_database_config)
.maybe_proxy_url(proxy_url)
.build()
}
#[getter]
#[gen_stub(
override_return_type(
type_repr = "nautilus_trader.model.Chain",
imports = ("nautilus_trader.model",),
),
)]
fn chain(&self) -> Chain {
(*self.chain).clone()
}
#[getter]
fn http_rpc_url(&self) -> String {
self.http_rpc_url.clone()
}
#[getter]
fn wss_rpc_url(&self) -> Option<String> {
self.wss_rpc_url.clone()
}
#[getter]
const fn rpc_requests_per_second(&self) -> Option<u32> {
self.rpc_requests_per_second
}
#[getter]
const fn use_hypersync_for_live_data(&self) -> bool {
self.use_hypersync_for_live_data
}
#[getter]
#[expect(clippy::wrong_self_convention)]
const fn from_block(&self) -> Option<u64> {
self.from_block
}
#[getter]
fn proxy_url(&self) -> Option<String> {
self.proxy_url.clone()
}
fn __repr__(&self) -> String {
format!(
"BlockchainDataClientConfig(chain={:?}, http_rpc_url={}, wss_rpc_url={:?}, use_hypersync_for_live_data={}, from_block={:?})",
self.chain.name,
self.http_rpc_url,
self.wss_rpc_url,
self.use_hypersync_for_live_data,
self.from_block
)
}
}