use nautilus_model::defi::{
Block, Pool, PoolLiquidityUpdate, PoolSwap, Token, data::PoolFeeCollect,
};
use sqlx::{PgPool, postgres::PgPoolCopyExt};
fn format_scientific_to_decimal(s: &str) -> String {
use std::str::FromStr;
use rust_decimal::Decimal;
match Decimal::from_str(s) {
Ok(decimal) => decimal.to_string(),
Err(_) => s.to_string(), }
}
fn format_numeric<T: ToString>(value: &T) -> String {
let s = value.to_string();
let s = s.trim_start_matches('+');
if s.contains('e') || s.contains('E') {
return format_scientific_to_decimal(s);
}
s.to_string()
}
#[derive(Debug)]
pub struct PostgresCopyHandler<'a> {
pool: &'a PgPool,
}
impl<'a> PostgresCopyHandler<'a> {
#[must_use]
pub const fn new(pool: &'a PgPool) -> Self {
Self { pool }
}
pub async fn copy_blocks(&self, chain_id: u32, blocks: &[Block]) -> anyhow::Result<()> {
if blocks.is_empty() {
return Ok(());
}
let copy_statement = r"
COPY block (
chain_id, number, hash, parent_hash, miner, gas_limit, gas_used, timestamp,
base_fee_per_gas, blob_gas_used, excess_blob_gas,
l1_gas_price, l1_gas_used, l1_fee_scalar
) FROM STDIN WITH (FORMAT BINARY)";
let mut copy_in = self
.pool
.copy_in_raw(copy_statement)
.await
.map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
self.write_copy_header(&mut copy_in).await?;
for block in blocks {
self.write_block_binary(&mut copy_in, chain_id, block)
.await?;
}
self.write_copy_trailer(&mut copy_in).await?;
copy_in
.finish()
.await
.map_err(|e| anyhow::anyhow!("Failed to finish COPY operation: {e}"))?;
Ok(())
}
pub async fn copy_tokens(&self, chain_id: u32, tokens: &[Token]) -> anyhow::Result<()> {
if tokens.is_empty() {
return Ok(());
}
let copy_statement = r"
COPY token (
chain_id, address, name, symbol, decimals
) FROM STDIN WITH (FORMAT BINARY)";
let mut copy_in = self
.pool
.copy_in_raw(copy_statement)
.await
.map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
self.write_copy_header(&mut copy_in).await?;
for token in tokens {
self.write_token_binary(&mut copy_in, chain_id, token)
.await?;
}
self.write_copy_trailer(&mut copy_in).await?;
copy_in
.finish()
.await
.map_err(|e| anyhow::anyhow!("Failed to finish COPY operation: {e}"))?;
Ok(())
}
pub async fn copy_pools(&self, chain_id: u32, pools: &[Pool]) -> anyhow::Result<()> {
if pools.is_empty() {
return Ok(());
}
let copy_statement = r"
COPY pool (
chain_id, dex_name, address, pool_identifier, creation_block,
token0_chain, token0_address, token1_chain, token1_address,
fee, tick_spacing, initial_tick, initial_sqrt_price_x96, hook_address
) FROM STDIN WITH (FORMAT BINARY)";
let mut copy_in = self
.pool
.copy_in_raw(copy_statement)
.await
.map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
self.write_copy_header(&mut copy_in).await?;
for pool in pools {
self.write_pool_binary(&mut copy_in, chain_id, pool).await?;
}
self.write_copy_trailer(&mut copy_in).await?;
copy_in
.finish()
.await
.map_err(|e| anyhow::anyhow!("Failed to finish COPY operation: {e}"))?;
Ok(())
}
pub async fn copy_pool_swaps(&self, chain_id: u32, swaps: &[PoolSwap]) -> anyhow::Result<()> {
if swaps.is_empty() {
return Ok(());
}
let copy_statement = r"
COPY pool_swap_event (
chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
log_index, sender, recipient, sqrt_price_x96, liquidity, tick, amount0, amount1,
order_side, base_quantity, quote_quantity, spot_price, execution_price
) FROM STDIN WITH (FORMAT BINARY)";
let mut copy_in = self
.pool
.copy_in_raw(copy_statement)
.await
.map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
self.write_copy_header(&mut copy_in).await?;
for swap in swaps {
self.write_pool_swap_binary(&mut copy_in, chain_id, swap)
.await?;
}
self.write_copy_trailer(&mut copy_in).await?;
copy_in.finish().await.map_err(|e| {
log::error!("COPY operation failed for pool_swap batch:");
log::error!(" Chain ID: {chain_id}");
log::error!(" Batch size: {}", swaps.len());
if !swaps.is_empty() {
log::error!(
" Block range: {} to {}",
swaps.iter().map(|s| s.block).min().unwrap_or(0),
swaps.iter().map(|s| s.block).max().unwrap_or(0)
);
}
for (i, swap) in swaps.iter().take(5).enumerate() {
log::error!(
" Swap[{}]: tx={} log_idx={} block={} pool={}",
i,
swap.transaction_hash,
swap.log_index,
swap.block,
swap.instrument_id
);
}
if swaps.len() > 5 {
log::error!(" ... and {} more swaps", swaps.len() - 5);
}
anyhow::anyhow!("Failed to finish COPY operation: {e}")
})?;
Ok(())
}
pub async fn copy_pool_liquidity_updates(
&self,
chain_id: u32,
updates: &[PoolLiquidityUpdate],
) -> anyhow::Result<()> {
if updates.is_empty() {
return Ok(());
}
let copy_statement = r"
COPY pool_liquidity_event (
chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
log_index, event_type, sender, owner, position_liquidity,
amount0, amount1, tick_lower, tick_upper
) FROM STDIN WITH (FORMAT BINARY)";
let mut copy_in = self
.pool
.copy_in_raw(copy_statement)
.await
.map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
self.write_copy_header(&mut copy_in).await?;
for update in updates {
self.write_pool_liquidity_update_binary(&mut copy_in, chain_id, update)
.await?;
}
self.write_copy_trailer(&mut copy_in).await?;
copy_in.finish().await.map_err(|e| {
log::error!("COPY operation failed for pool_liquidity batch:");
log::error!(" Chain ID: {chain_id}");
log::error!(" Batch size: {}", updates.len());
if !updates.is_empty() {
log::error!(
" Block range: {} to {}",
updates.iter().map(|u| u.block).min().unwrap_or(0),
updates.iter().map(|u| u.block).max().unwrap_or(0)
);
}
for (i, update) in updates.iter().take(5).enumerate() {
log::error!(
" Update[{}]: tx={} log_idx={} block={} pool={} type={}",
i,
update.transaction_hash,
update.log_index,
update.block,
update.pool_identifier,
update.kind
);
}
if updates.len() > 5 {
log::error!(" ... and {} more updates", updates.len() - 5);
}
anyhow::anyhow!("Failed to finish COPY operation: {e}")
})?;
Ok(())
}
async fn write_copy_header(
&self,
copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
) -> anyhow::Result<()> {
use std::io::Write;
let mut header = Vec::new();
header.write_all(b"PGCOPY\n\xff\r\n\0")?; header.write_all(&[0, 0, 0, 0])?; header.write_all(&[0, 0, 0, 0])?;
copy_in
.send(header)
.await
.map_err(|e| anyhow::anyhow!("Failed to write COPY header: {e}"))?;
Ok(())
}
async fn write_block_binary(
&self,
copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
chain_id: u32,
block: &Block,
) -> anyhow::Result<()> {
use std::io::Write;
let mut row_data = Vec::new();
row_data.write_all(&14u16.to_be_bytes())?;
let chain_id_bytes = (chain_id as i32).to_be_bytes();
row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&chain_id_bytes)?;
let number_bytes = (block.number as i64).to_be_bytes();
row_data.write_all(&(number_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&number_bytes)?;
let hash_bytes = block.hash.as_bytes();
row_data.write_all(&(hash_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(hash_bytes)?;
let parent_hash_bytes = block.parent_hash.as_bytes();
row_data.write_all(&(parent_hash_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(parent_hash_bytes)?;
let miner_bytes = block.miner.to_string().as_bytes().to_vec();
row_data.write_all(&(miner_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&miner_bytes)?;
let gas_limit_bytes = (block.gas_limit as i64).to_be_bytes();
row_data.write_all(&(gas_limit_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&gas_limit_bytes)?;
let gas_used_bytes = (block.gas_used as i64).to_be_bytes();
row_data.write_all(&(gas_used_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&gas_used_bytes)?;
let timestamp_bytes = block.timestamp.to_string().as_bytes().to_vec();
row_data.write_all(&(timestamp_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(×tamp_bytes)?;
if let Some(ref base_fee) = block.base_fee_per_gas {
let base_fee_bytes = base_fee.to_string().as_bytes().to_vec();
row_data.write_all(&(base_fee_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&base_fee_bytes)?;
} else {
row_data.write_all(&(-1i32).to_be_bytes())?; }
if let Some(ref blob_gas) = block.blob_gas_used {
let blob_gas_bytes = blob_gas.to_string().as_bytes().to_vec();
row_data.write_all(&(blob_gas_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&blob_gas_bytes)?;
} else {
row_data.write_all(&(-1i32).to_be_bytes())?; }
if let Some(ref excess_blob) = block.excess_blob_gas {
let excess_blob_bytes = excess_blob.to_string().as_bytes().to_vec();
row_data.write_all(&(excess_blob_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&excess_blob_bytes)?;
} else {
row_data.write_all(&(-1i32).to_be_bytes())?; }
if let Some(ref l1_gas_price) = block.l1_gas_price {
let l1_gas_price_bytes = l1_gas_price.to_string().as_bytes().to_vec();
row_data.write_all(&(l1_gas_price_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&l1_gas_price_bytes)?;
} else {
row_data.write_all(&(-1i32).to_be_bytes())?; }
if let Some(l1_gas_used) = block.l1_gas_used {
let l1_gas_used_bytes = (l1_gas_used as i64).to_be_bytes();
row_data.write_all(&(l1_gas_used_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&l1_gas_used_bytes)?;
} else {
row_data.write_all(&(-1i32).to_be_bytes())?; }
if let Some(l1_fee_scalar) = block.l1_fee_scalar {
let l1_fee_scalar_bytes = (l1_fee_scalar as i64).to_be_bytes();
row_data.write_all(&(l1_fee_scalar_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&l1_fee_scalar_bytes)?;
} else {
row_data.write_all(&(-1i32).to_be_bytes())?; }
copy_in
.send(row_data)
.await
.map_err(|e| anyhow::anyhow!("Failed to write block data: {e}"))?;
Ok(())
}
async fn write_pool_swap_binary(
&self,
copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
chain_id: u32,
swap: &PoolSwap,
) -> anyhow::Result<()> {
use std::io::Write;
let mut row_data = Vec::new();
row_data.write_all(&19u16.to_be_bytes())?;
let chain_id_bytes = (chain_id as i32).to_be_bytes();
row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&chain_id_bytes)?;
let dex_name_bytes = swap.dex.name.to_string().as_bytes().to_vec();
row_data.write_all(&(dex_name_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&dex_name_bytes)?;
let pool_identifier = swap.instrument_id.to_string();
let pool_identifier_bytes = pool_identifier.as_bytes();
row_data.write_all(&(pool_identifier_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(pool_identifier_bytes)?;
let block_bytes = (swap.block as i64).to_be_bytes();
row_data.write_all(&(block_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&block_bytes)?;
let tx_hash_bytes = swap.transaction_hash.as_bytes();
row_data.write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(tx_hash_bytes)?;
let tx_index_bytes = (swap.transaction_index as i32).to_be_bytes();
row_data.write_all(&(tx_index_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&tx_index_bytes)?;
let log_index_bytes = (swap.log_index as i32).to_be_bytes();
row_data.write_all(&(log_index_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&log_index_bytes)?;
let sender_bytes = swap.sender.to_string().as_bytes().to_vec();
row_data.write_all(&(sender_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&sender_bytes)?;
let recipient_bytes = swap.recipient.to_string().as_bytes().to_vec();
row_data.write_all(&(recipient_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&recipient_bytes)?;
let sqrt_price_bytes = format_numeric(&swap.sqrt_price_x96).as_bytes().to_vec();
row_data.write_all(&(sqrt_price_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&sqrt_price_bytes)?;
let liquidity_bytes = format_numeric(&swap.liquidity).as_bytes().to_vec();
row_data.write_all(&(liquidity_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&liquidity_bytes)?;
let tick_bytes = swap.tick.to_be_bytes();
row_data.write_all(&(tick_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&tick_bytes)?;
let amount0_bytes = format_numeric(&swap.amount0).as_bytes().to_vec();
row_data.write_all(&(amount0_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&amount0_bytes)?;
let amount1_bytes = format_numeric(&swap.amount1).as_bytes().to_vec();
row_data.write_all(&(amount1_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&amount1_bytes)?;
if let Some(trade_info) = &swap.trade_info {
let side_bytes = trade_info.order_side.to_string().as_bytes().to_vec();
row_data.write_all(&(side_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&side_bytes)?;
let base_qty_decimal = trade_info.quantity_base.as_decimal();
let base_qty_str = base_qty_decimal.to_string();
let base_qty_bytes = base_qty_str.as_bytes();
row_data.write_all(&(base_qty_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(base_qty_bytes)?;
let quote_qty_decimal = trade_info.quantity_quote.as_decimal();
let quote_qty_str = quote_qty_decimal.to_string();
let quote_qty_bytes = quote_qty_str.as_bytes();
row_data.write_all(&(quote_qty_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(quote_qty_bytes)?;
let spot_price_decimal = trade_info.spot_price.as_decimal();
let spot_price_str = spot_price_decimal.to_string();
let spot_price_bytes = spot_price_str.as_bytes();
row_data.write_all(&(spot_price_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(spot_price_bytes)?;
let exec_price_decimal = trade_info.execution_price.as_decimal();
let exec_price_str = exec_price_decimal.to_string();
let exec_price_bytes = exec_price_str.as_bytes();
row_data.write_all(&(exec_price_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(exec_price_bytes)?;
} else {
row_data.write_all(&(-1i32).to_be_bytes())?;
row_data.write_all(&(-1i32).to_be_bytes())?;
row_data.write_all(&(-1i32).to_be_bytes())?;
row_data.write_all(&(-1i32).to_be_bytes())?;
row_data.write_all(&(-1i32).to_be_bytes())?;
}
copy_in
.send(row_data)
.await
.map_err(|e| anyhow::anyhow!("Failed to write pool swap data: {e}"))?;
Ok(())
}
async fn write_pool_liquidity_update_binary(
&self,
copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
chain_id: u32,
update: &PoolLiquidityUpdate,
) -> anyhow::Result<()> {
use std::io::Write;
let mut row_data = Vec::new();
row_data.write_all(&15u16.to_be_bytes())?;
let chain_id_bytes = (chain_id as i32).to_be_bytes();
row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&chain_id_bytes)?;
let dex_name_bytes = update.dex.name.to_string().as_bytes().to_vec();
row_data.write_all(&(dex_name_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&dex_name_bytes)?;
let pool_identifier = update.instrument_id.to_string();
let pool_identifier_bytes = pool_identifier.as_bytes();
row_data.write_all(&(pool_identifier_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(pool_identifier_bytes)?;
let block_bytes = (update.block as i64).to_be_bytes();
row_data.write_all(&(block_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&block_bytes)?;
let tx_hash_bytes = update.transaction_hash.as_bytes();
row_data.write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(tx_hash_bytes)?;
let tx_index_bytes = (update.transaction_index as i32).to_be_bytes();
row_data.write_all(&(tx_index_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&tx_index_bytes)?;
let log_index_bytes = (update.log_index as i32).to_be_bytes();
row_data.write_all(&(log_index_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&log_index_bytes)?;
let event_type_bytes = update.kind.to_string().as_bytes().to_vec();
row_data.write_all(&(event_type_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&event_type_bytes)?;
if let Some(sender) = update.sender {
let sender_bytes = sender.to_string().as_bytes().to_vec();
row_data.write_all(&(sender_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&sender_bytes)?;
} else {
row_data.write_all(&(-1i32).to_be_bytes())?; }
let owner_bytes = update.owner.to_string().as_bytes().to_vec();
row_data.write_all(&(owner_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&owner_bytes)?;
let position_liquidity_bytes = format_numeric(&update.position_liquidity)
.as_bytes()
.to_vec();
row_data.write_all(&(position_liquidity_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&position_liquidity_bytes)?;
let amount0_bytes = format_numeric(&update.amount0).as_bytes().to_vec();
row_data.write_all(&(amount0_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&amount0_bytes)?;
let amount1_bytes = format_numeric(&update.amount1).as_bytes().to_vec();
row_data.write_all(&(amount1_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&amount1_bytes)?;
let tick_lower_bytes = update.tick_lower.to_be_bytes();
row_data.write_all(&(tick_lower_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&tick_lower_bytes)?;
let tick_upper_bytes = update.tick_upper.to_be_bytes();
row_data.write_all(&(tick_upper_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&tick_upper_bytes)?;
copy_in
.send(row_data)
.await
.map_err(|e| anyhow::anyhow!("Failed to write pool liquidity update data: {e}"))?;
Ok(())
}
pub async fn copy_pool_collects(
&self,
chain_id: u32,
collects: &[PoolFeeCollect],
) -> anyhow::Result<()> {
if collects.is_empty() {
return Ok(());
}
let copy_statement = r"
COPY pool_collect_event (
chain_id, dex_name, pool_identifier, block, transaction_hash, transaction_index,
log_index, owner, amount0, amount1, tick_lower, tick_upper
) FROM STDIN WITH (FORMAT BINARY)";
let mut copy_in = self
.pool
.copy_in_raw(copy_statement)
.await
.map_err(|e| anyhow::anyhow!("Failed to start COPY operation: {e}"))?;
self.write_copy_header(&mut copy_in).await?;
for collect in collects {
self.write_pool_fee_collect_binary(&mut copy_in, chain_id, collect)
.await?;
}
self.write_copy_trailer(&mut copy_in).await?;
copy_in.finish().await.map_err(|e| {
log::error!("COPY operation failed for temp_pool_collect batch:");
log::error!(" Chain ID: {chain_id}");
log::error!(" Batch size: {}", collects.len());
if !collects.is_empty() {
log::error!(
" Block range: {} to {}",
collects.iter().map(|c| c.block).min().unwrap_or(0),
collects.iter().map(|c| c.block).max().unwrap_or(0)
);
}
for (i, collect) in collects.iter().take(5).enumerate() {
log::error!(
" Collect[{}]: tx={} log_idx={} block={} pool={} owner={}",
i,
collect.transaction_hash,
collect.log_index,
collect.block,
collect.pool_identifier,
collect.owner
);
}
if collects.len() > 5 {
log::error!(" ... and {} more collects", collects.len() - 5);
}
anyhow::anyhow!("Failed to finish COPY operation: {e}")
})?;
Ok(())
}
async fn write_pool_fee_collect_binary(
&self,
copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
chain_id: u32,
collect: &PoolFeeCollect,
) -> anyhow::Result<()> {
use std::io::Write;
let mut row_data = Vec::new();
row_data.write_all(&12u16.to_be_bytes())?;
let chain_id_bytes = (chain_id as i32).to_be_bytes();
row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&chain_id_bytes)?;
let dex_name_bytes = collect.dex.name.to_string().as_bytes().to_vec();
row_data.write_all(&(dex_name_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&dex_name_bytes)?;
let pool_identifier = collect.instrument_id.to_string();
let pool_identifier_bytes = pool_identifier.as_bytes();
row_data.write_all(&(pool_identifier_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(pool_identifier_bytes)?;
let block_bytes = (collect.block as i64).to_be_bytes();
row_data.write_all(&(block_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&block_bytes)?;
let tx_hash_bytes = collect.transaction_hash.as_bytes();
row_data.write_all(&(tx_hash_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(tx_hash_bytes)?;
let tx_index_bytes = (collect.transaction_index as i32).to_be_bytes();
row_data.write_all(&(tx_index_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&tx_index_bytes)?;
let log_index_bytes = (collect.log_index as i32).to_be_bytes();
row_data.write_all(&(log_index_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&log_index_bytes)?;
let owner_bytes = collect.owner.to_string().as_bytes().to_vec();
row_data.write_all(&(owner_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&owner_bytes)?;
let fee0_bytes = format_numeric(&collect.amount0).as_bytes().to_vec();
row_data.write_all(&(fee0_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&fee0_bytes)?;
let fee1_bytes = format_numeric(&collect.amount1).as_bytes().to_vec();
row_data.write_all(&(fee1_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&fee1_bytes)?;
let tick_lower_bytes = collect.tick_lower.to_be_bytes();
row_data.write_all(&(tick_lower_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&tick_lower_bytes)?;
let tick_upper_bytes = collect.tick_upper.to_be_bytes();
row_data.write_all(&(tick_upper_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&tick_upper_bytes)?;
copy_in
.send(row_data)
.await
.map_err(|e| anyhow::anyhow!("Failed to write pool fee collect data: {e}"))?;
Ok(())
}
async fn write_token_binary(
&self,
copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
chain_id: u32,
token: &Token,
) -> anyhow::Result<()> {
use std::io::Write;
let mut row_data = Vec::new();
row_data.write_all(&5u16.to_be_bytes())?;
let chain_id_bytes = (chain_id as i32).to_be_bytes();
row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&chain_id_bytes)?;
let address_bytes = token.address.to_string().as_bytes().to_vec();
row_data.write_all(&(address_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&address_bytes)?;
let name_bytes = token.name.as_bytes();
row_data.write_all(&(name_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(name_bytes)?;
let symbol_bytes = token.symbol.as_bytes();
row_data.write_all(&(symbol_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(symbol_bytes)?;
let decimals_bytes = (i32::from(token.decimals)).to_be_bytes();
row_data.write_all(&(decimals_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&decimals_bytes)?;
copy_in
.send(row_data)
.await
.map_err(|e| anyhow::anyhow!("Failed to write token data: {e}"))?;
Ok(())
}
async fn write_pool_binary(
&self,
copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
chain_id: u32,
pool: &Pool,
) -> anyhow::Result<()> {
use std::io::Write;
let mut row_data = Vec::new();
row_data.write_all(&14u16.to_be_bytes())?;
let chain_id_bytes = (chain_id as i32).to_be_bytes();
row_data.write_all(&(chain_id_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&chain_id_bytes)?;
let dex_name_bytes = pool.dex.name.to_string().as_bytes().to_vec();
row_data.write_all(&(dex_name_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&dex_name_bytes)?;
let address_bytes = pool.address.to_string().as_bytes().to_vec();
row_data.write_all(&(address_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&address_bytes)?;
let pool_identifier_bytes = pool.pool_identifier.as_str().as_bytes();
row_data.write_all(&(pool_identifier_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(pool_identifier_bytes)?;
let creation_block_bytes = (pool.creation_block as i64).to_be_bytes();
row_data.write_all(&(creation_block_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&creation_block_bytes)?;
let token0_chain_bytes = (pool.token0.chain.chain_id as i32).to_be_bytes();
row_data.write_all(&(token0_chain_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&token0_chain_bytes)?;
let token0_address_bytes = pool.token0.address.to_string().as_bytes().to_vec();
row_data.write_all(&(token0_address_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&token0_address_bytes)?;
let token1_chain_bytes = (pool.token1.chain.chain_id as i32).to_be_bytes();
row_data.write_all(&(token1_chain_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&token1_chain_bytes)?;
let token1_address_bytes = pool.token1.address.to_string().as_bytes().to_vec();
row_data.write_all(&(token1_address_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&token1_address_bytes)?;
if let Some(fee) = pool.fee {
let fee_bytes = (fee as i32).to_be_bytes();
row_data.write_all(&(fee_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&fee_bytes)?;
} else {
row_data.write_all(&(-1i32).to_be_bytes())?; }
if let Some(tick_spacing) = pool.tick_spacing {
let tick_spacing_bytes = (tick_spacing as i32).to_be_bytes();
row_data.write_all(&(tick_spacing_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&tick_spacing_bytes)?;
} else {
row_data.write_all(&(-1i32).to_be_bytes())?; }
if let Some(initial_tick) = pool.initial_tick {
let initial_tick_bytes = initial_tick.to_be_bytes();
row_data.write_all(&(initial_tick_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&initial_tick_bytes)?;
} else {
row_data.write_all(&(-1i32).to_be_bytes())?; }
if let Some(ref initial_sqrt_price) = pool.initial_sqrt_price_x96 {
let sqrt_price_bytes = format_numeric(initial_sqrt_price).as_bytes().to_vec();
row_data.write_all(&(sqrt_price_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&sqrt_price_bytes)?;
} else {
row_data.write_all(&(-1i32).to_be_bytes())?; }
if let Some(ref hooks) = pool.hooks {
let hooks_bytes = hooks.to_string().as_bytes().to_vec();
row_data.write_all(&(hooks_bytes.len() as i32).to_be_bytes())?;
row_data.write_all(&hooks_bytes)?;
} else {
row_data.write_all(&(-1i32).to_be_bytes())?; }
copy_in
.send(row_data)
.await
.map_err(|e| anyhow::anyhow!("Failed to write pool data: {e}"))?;
Ok(())
}
async fn write_copy_trailer(
&self,
copy_in: &mut sqlx::postgres::PgCopyIn<sqlx::pool::PoolConnection<sqlx::Postgres>>,
) -> anyhow::Result<()> {
let trailer = (-1i16).to_be_bytes();
copy_in
.send(trailer.to_vec())
.await
.map_err(|e| anyhow::anyhow!("Failed to write COPY trailer: {e}"))?;
Ok(())
}
}