use std::sync::Arc;
use ahash::AHashMap;
use alloy::primitives::Address;
use futures_util::Stream;
use hypersync_client::{
StreamConfig,
net_types::{BlockField, BlockSelection, FieldSelection, Query},
simple_types::Log,
};
use nautilus_common::live::get_runtime;
use nautilus_core::hex;
use nautilus_model::{
defi::{Block, DexType, SharedChain},
identifiers::InstrumentId,
};
use nautilus_network::http::Url;
use crate::{
exchanges::get_dex_extended, hypersync::transform::transform_hypersync_block,
rpc::types::BlockchainMessage,
};
const BLOCK_POLLING_INTERVAL_MS: u64 = 50;
const HYPERSYNC_REQUEST_TIMEOUT_SECS: u64 = 30;
const DISCONNECT_TIMEOUT_SECS: u64 = 5;
#[derive(Debug)]
pub struct HyperSyncClient {
chain: SharedChain,
client: Arc<hypersync_client::Client>,
blocks_task: Option<tokio::task::JoinHandle<()>>,
blocks_cancellation_token: Option<tokio_util::sync::CancellationToken>,
tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
pool_addresses: AHashMap<InstrumentId, Address>,
cancellation_token: tokio_util::sync::CancellationToken,
}
impl HyperSyncClient {
#[must_use]
pub fn new(
chain: SharedChain,
tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
cancellation_token: tokio_util::sync::CancellationToken,
) -> Self {
let mut config = hypersync_client::ClientConfig::default();
let hypersync_url =
Url::parse(chain.hypersync_url.as_str()).expect("Invalid HyperSync URL");
config.url = hypersync_url.to_string();
config.api_token = std::env::var("ENVIO_API_TOKEN")
.expect("ENVIO_API_TOKEN environment variable must be set");
let client = hypersync_client::Client::new(config)
.expect("Failed to create HyperSync client - check ENVIO_API_TOKEN is a valid UUID");
Self {
chain,
client: Arc::new(client),
blocks_task: None,
blocks_cancellation_token: None,
tx,
pool_addresses: AHashMap::new(),
cancellation_token,
}
}
#[must_use]
pub fn get_pool_address(&self, instrument_id: InstrumentId) -> Option<&Address> {
self.pool_addresses.get(&instrument_id)
}
pub fn process_block_dex_contract_events(
&mut self,
dex: &DexType,
block: u64,
contract_addresses: &[Address],
swap_event_encoded_signature: String,
mint_event_encoded_signature: String,
burn_event_encoded_signature: String,
) {
let topics = vec![
swap_event_encoded_signature.as_str(),
&mint_event_encoded_signature.as_str(),
&burn_event_encoded_signature.as_str(),
];
let query = Self::construct_contract_events_query(
block,
Some(block + 1),
contract_addresses,
&topics,
);
let tx = if let Some(tx) = &self.tx {
tx.clone()
} else {
log::error!("Hypersync client channel should have been initialized");
return;
};
let client = self.client.clone();
let dex_extended =
get_dex_extended(self.chain.name, dex).expect("Failed to get dex extended");
let cancellation_token = self.cancellation_token.clone();
let _task = get_runtime().spawn(async move {
let mut rx = match client.stream(query, StreamConfig::default()).await {
Ok(rx) => rx,
Err(e) => {
log::error!("Failed to create DEX event stream: {e}");
return;
}
};
loop {
tokio::select! {
() = cancellation_token.cancelled() => {
log::debug!("DEX event processing task received cancellation signal");
break;
}
response = rx.recv() => {
let Some(response) = response else {
break;
};
let response = match response {
Ok(resp) => resp,
Err(e) => {
log::error!("Failed to receive DEX event stream response: {e}");
break;
}
};
for batch in response.data.logs {
for log in batch {
let event_signature = match log.topics.first().and_then(|t| t.as_ref()) {
Some(log_argument) => {
hex::encode_prefixed(log_argument.as_ref())
}
None => continue,
};
if event_signature == swap_event_encoded_signature {
match dex_extended.parse_swap_event_hypersync(&log) {
Ok(swap_event) => {
if let Err(e) =
tx.send(BlockchainMessage::SwapEvent(swap_event))
{
log::error!("Failed to send swap event: {e}");
}
}
Err(e) => {
log::error!(
"Failed to parse swap with error '{e:?}' for event: {log:?}",
);
}
}
} else if event_signature == mint_event_encoded_signature {
match dex_extended.parse_mint_event_hypersync(&log) {
Ok(swap_event) => {
if let Err(e) =
tx.send(BlockchainMessage::MintEvent(swap_event))
{
log::error!("Failed to send mint event: {e}");
}
}
Err(e) => {
log::error!(
"Failed to parse mint with error '{e:?}' for event: {log:?}",
);
}
}
} else if event_signature == burn_event_encoded_signature {
match dex_extended.parse_burn_event_hypersync(&log) {
Ok(swap_event) => {
if let Err(e) =
tx.send(BlockchainMessage::BurnEvent(swap_event))
{
log::error!("Failed to send burn event: {e}");
}
}
Err(e) => {
log::error!(
"Failed to parse burn with error '{e:?}' for event: {log:?}",
);
}
}
} else {
log::error!("Unknown event signature: {event_signature}");
}
}
}
}
}
}
});
}
pub async fn request_contract_events_stream(
&self,
from_block: u64,
to_block: Option<u64>,
contract_address: &Address,
topics: Vec<&str>,
) -> impl Stream<Item = Log> + use<> {
let query = Self::construct_contract_events_query(
from_block,
to_block,
&[*contract_address],
&topics,
);
let mut rx = self
.client
.clone()
.stream(query, StreamConfig::default())
.await
.expect("Failed to create stream");
async_stream::stream! {
while let Some(response) = rx.recv().await {
let response = response.unwrap();
for batch in response.data.logs {
for log in batch {
yield log
}
}
}
}
}
pub async fn disconnect(&mut self) {
log::debug!("Disconnecting HyperSync client");
self.cancellation_token.cancel();
if let Some(mut task) = self.blocks_task.take() {
match tokio::time::timeout(
std::time::Duration::from_secs(DISCONNECT_TIMEOUT_SECS),
&mut task,
)
.await
{
Ok(Ok(())) => {
log::debug!("Blocks task completed gracefully");
}
Ok(Err(e)) => {
log::error!("Error awaiting blocks task: {e}");
}
Err(_) => {
log::warn!(
"Blocks task did not complete within {DISCONNECT_TIMEOUT_SECS}s timeout, \
aborting task (this is expected if Hypersync long-poll was in progress)"
);
task.abort();
let _ = task.await;
}
}
}
log::debug!("HyperSync client disconnected");
}
pub async fn current_block(&self) -> u64 {
self.client.get_height().await.unwrap()
}
pub async fn request_blocks_stream(
&self,
from_block: u64,
to_block: Option<u64>,
) -> impl Stream<Item = Block> {
let query = Self::construct_block_query(from_block, to_block);
let mut rx = self
.client
.clone()
.stream(query, StreamConfig::default())
.await
.unwrap();
let chain = self.chain.name;
async_stream::stream! {
while let Some(response) = rx.recv().await {
let response = response.unwrap();
for batch in response.data.blocks {
for received_block in batch {
let block = transform_hypersync_block(chain, received_block).unwrap();
yield block
}
}
}
}
}
pub fn subscribe_blocks(&mut self) {
if self.blocks_task.is_some() {
return;
}
let chain = self.chain.name;
let client = self.client.clone();
let tx = if let Some(tx) = &self.tx {
tx.clone()
} else {
log::error!("Hypersync client channel should have been initialized");
return;
};
let blocks_token = self.cancellation_token.child_token();
let cancellation_token = blocks_token.clone();
self.blocks_cancellation_token = Some(blocks_token);
let task = get_runtime().spawn(async move {
log::debug!("Starting task 'blocks_feed");
let current_block_height = client.get_height().await.unwrap();
let mut query = Self::construct_block_query(current_block_height, None);
loop {
tokio::select! {
() = cancellation_token.cancelled() => {
log::debug!("Blocks subscription task received cancellation signal");
break;
}
result = tokio::time::timeout(
std::time::Duration::from_secs(HYPERSYNC_REQUEST_TIMEOUT_SECS),
client.get(&query)
) => {
let response = match result {
Ok(Ok(resp)) => resp,
Ok(Err(e)) => {
log::error!("Hypersync request failed: {e}");
break;
}
Err(_) => {
log::warn!("Hypersync request timed out after {HYPERSYNC_REQUEST_TIMEOUT_SECS}s, retrying...");
continue;
}
};
for batch in response.data.blocks {
for received_block in batch {
let block = transform_hypersync_block(chain, received_block).unwrap();
let msg = BlockchainMessage::Block(block);
if let Err(e) = tx.send(msg) {
log::error!("Error sending message: {e}");
}
}
}
if let Some(archive_block_height) = response.archive_height
&& archive_block_height < response.next_block
{
while client.get_height().await.unwrap() < response.next_block {
tokio::select! {
() = cancellation_token.cancelled() => {
log::debug!("Blocks subscription task received cancellation signal during polling");
return;
}
() = tokio::time::sleep(std::time::Duration::from_millis(
BLOCK_POLLING_INTERVAL_MS,
)) => {}
}
}
}
query.from_block = response.next_block;
}
}
}
});
self.blocks_task = Some(task);
}
fn construct_block_query(from_block: u64, to_block: Option<u64>) -> Query {
Query {
from_block,
to_block,
blocks: vec![BlockSelection::default()],
field_selection: FieldSelection {
block: BlockField::all(),
..Default::default()
},
..Default::default()
}
}
fn construct_contract_events_query(
from_block: u64,
to_block: Option<u64>,
contract_addresses: &[Address],
topics: &[&str],
) -> Query {
let mut query_value = serde_json::json!({
"from_block": from_block,
"logs": [{
"topics": [topics],
"address": contract_addresses
}],
"field_selection": {
"log": [
"block_number",
"transaction_hash",
"transaction_index",
"log_index",
"address",
"data",
"topic0",
"topic1",
"topic2",
"topic3",
]
}
});
if let Some(to_block) = to_block
&& let Some(obj) = query_value.as_object_mut()
{
obj.insert("to_block".to_string(), serde_json::json!(to_block));
}
serde_json::from_value(query_value).unwrap()
}
pub async fn unsubscribe_blocks(&mut self) {
if let Some(task) = self.blocks_task.take() {
if let Some(token) = self.blocks_cancellation_token.take() {
token.cancel();
}
if let Err(e) = task.await {
log::error!("Error awaiting blocks task during unsubscribe: {e}");
}
log::debug!("Unsubscribed from blocks");
}
}
}