use std::{num::ParseIntError, str::FromStr};
use alloy::primitives::{Address, I256, U160, U256};
use nautilus_core::{
UnixNanos,
datetime::{NANOSECONDS_IN_MICROSECOND, NANOSECONDS_IN_MILLISECOND, NANOSECONDS_IN_SECOND},
};
use nautilus_model::{
defi::{
PoolLiquidityUpdate, PoolLiquidityUpdateType, PoolSwap, SharedChain, SharedDex,
data::{
DexPoolData, PoolFeeCollect, PoolFeeProtocolCollect, PoolFeeProtocolUpdate, PoolFlash,
},
validation::validate_address,
},
identifiers::InstrumentId,
};
use sqlx::{FromRow, Row, postgres::PgRow};
const MAX_UNIX_SECONDS_TIMESTAMP: u64 = 9_999_999_999;
const MAX_UNIX_MILLISECONDS_TIMESTAMP: u64 = MAX_UNIX_SECONDS_TIMESTAMP * 1_000 + 999;
const MAX_UNIX_MICROSECONDS_TIMESTAMP: u64 = MAX_UNIX_SECONDS_TIMESTAMP * 1_000_000 + 999_999;
#[derive(Debug)]
pub struct TokenRow {
pub address: Address,
pub name: String,
pub symbol: String,
pub decimals: i32,
}
impl<'r> FromRow<'r, PgRow> for TokenRow {
fn from_row(row: &'r PgRow) -> Result<Self, sqlx::Error> {
let address = validate_address(row.try_get::<String, _>("address")?.as_str()).unwrap();
let name = row.try_get::<String, _>("name")?;
let symbol = row.try_get::<String, _>("symbol")?;
let decimals = row.try_get::<i32, _>("decimals")?;
let token = Self {
address,
name,
symbol,
decimals,
};
Ok(token)
}
}
#[derive(Debug)]
pub struct PoolRow {
pub address: Address,
pub pool_identifier: String,
pub dex_name: String,
pub creation_block: i64,
pub creation_block_timestamp: Option<UnixNanos>,
pub token0_chain: i32,
pub token0_address: Address,
pub token1_chain: i32,
pub token1_address: Address,
pub fee: Option<i32>,
pub tick_spacing: Option<i32>,
pub initial_tick: Option<i32>,
pub initial_sqrt_price_x96: Option<String>,
pub hook_address: Option<String>,
}
impl<'r> FromRow<'r, PgRow> for PoolRow {
fn from_row(row: &'r PgRow) -> Result<Self, sqlx::Error> {
let address = validate_address(row.try_get::<String, _>("address")?.as_str()).unwrap();
let pool_identifier = row.try_get::<String, _>("pool_identifier")?;
let dex_name = row.try_get::<String, _>("dex_name")?;
let creation_block = row.try_get::<i64, _>("creation_block")?;
let creation_block_timestamp =
row.try_get::<Option<String>, _>("creation_block_timestamp")?;
let creation_block_timestamp = creation_block_timestamp
.as_deref()
.map(parse_cached_block_timestamp)
.transpose()
.map_err(|e| {
sqlx::Error::Decode(
format!("Invalid creation block timestamp '{creation_block_timestamp:?}': {e}")
.into(),
)
})?;
let token0_chain = row.try_get::<i32, _>("token0_chain")?;
let token0_address =
validate_address(row.try_get::<String, _>("token0_address")?.as_str()).unwrap();
let token1_chain = row.try_get::<i32, _>("token1_chain")?;
let token1_address =
validate_address(row.try_get::<String, _>("token1_address")?.as_str()).unwrap();
let fee = row.try_get::<Option<i32>, _>("fee")?;
let tick_spacing = row.try_get::<Option<i32>, _>("tick_spacing")?;
let initial_tick = row.try_get::<Option<i32>, _>("initial_tick")?;
let initial_sqrt_price_x96 = row.try_get::<Option<String>, _>("initial_sqrt_price_x96")?;
let hook_address = row.try_get::<Option<String>, _>("hook_address")?;
Ok(Self {
address,
pool_identifier,
dex_name,
creation_block,
creation_block_timestamp,
token0_chain,
token0_address,
token1_chain,
token1_address,
fee,
tick_spacing,
initial_tick,
initial_sqrt_price_x96,
hook_address,
})
}
}
#[derive(Debug)]
pub struct BlockTimestampRow {
pub number: u64,
pub timestamp: UnixNanos,
}
impl FromRow<'_, PgRow> for BlockTimestampRow {
fn from_row(row: &PgRow) -> Result<Self, sqlx::Error> {
let number = row.try_get::<i64, _>("number")? as u64;
let timestamp = row.try_get::<String, _>("timestamp")?;
let timestamp = parse_cached_block_timestamp(×tamp).map_err(|e| {
sqlx::Error::Decode(format!("Invalid block timestamp '{timestamp}': {e}").into())
})?;
Ok(Self { number, timestamp })
}
}
pub(crate) fn parse_cached_block_timestamp(value: &str) -> Result<UnixNanos, ParseIntError> {
let timestamp = value.parse::<u64>()?;
if timestamp <= MAX_UNIX_SECONDS_TIMESTAMP {
return Ok(UnixNanos::from(timestamp * NANOSECONDS_IN_SECOND));
}
if timestamp <= MAX_UNIX_MILLISECONDS_TIMESTAMP {
return Ok(UnixNanos::from(timestamp * NANOSECONDS_IN_MILLISECOND));
}
if timestamp <= MAX_UNIX_MICROSECONDS_TIMESTAMP {
return Ok(UnixNanos::from(timestamp * NANOSECONDS_IN_MICROSECOND));
}
Ok(UnixNanos::from(timestamp))
}
pub fn transform_row_to_dex_pool_data(
row: &PgRow,
chain: SharedChain,
dex: SharedDex,
instrument_id: InstrumentId,
) -> Result<DexPoolData, sqlx::Error> {
let event_type = row.try_get::<String, _>("event_type")?;
let pool_identifier_str = row.try_get::<String, _>("pool_identifier")?;
let pool_identifier = pool_identifier_str
.parse()
.map_err(|e| sqlx::Error::Decode(format!("Invalid pool identifier: {e}").into()))?;
let block = row.try_get::<i64, _>("block")? as u64;
let transaction_hash = row.try_get::<String, _>("transaction_hash")?;
let transaction_index = row.try_get::<i32, _>("transaction_index")? as u32;
let log_index = row.try_get::<i32, _>("log_index")? as u32;
let block_timestamp = row.try_get::<String, _>("block_timestamp")?;
let timestamp = parse_cached_block_timestamp(&block_timestamp).map_err(|e| {
sqlx::Error::Decode(format!("Invalid block timestamp '{block_timestamp}': {e}").into())
})?;
match event_type.as_str() {
"swap" => {
let sender_str = row
.try_get::<Option<String>, _>("sender")?
.ok_or_else(|| sqlx::Error::Decode("Missing sender for swap event".into()))?;
let sender = validate_address(&sender_str)
.map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
let recipient_str = row
.try_get::<Option<String>, _>("recipient")?
.ok_or_else(|| sqlx::Error::Decode("Missing recipient for swap event".into()))?;
let recipient = validate_address(&recipient_str)
.map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
let sqrt_price_x96_str = row
.try_get::<Option<String>, _>("sqrt_price_x96")?
.ok_or_else(|| {
sqlx::Error::Decode("Missing sqrt_price_x96 for swap event".into())
})?;
let sqrt_price_x96 = U160::from_str(&sqrt_price_x96_str).map_err(|e| {
sqlx::Error::Decode(
format!("Invalid sqrt_price_x96 '{sqrt_price_x96_str}': {e}").into(),
)
})?;
let swap_liquidity_str = row.try_get::<String, _>("swap_liquidity")?;
let swap_liquidity = u128::from_str(&swap_liquidity_str)
.map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
let swap_tick = row.try_get::<i32, _>("swap_tick")?;
let swap_amount0_str = row
.try_get::<Option<String>, _>("swap_amount0")?
.ok_or_else(|| sqlx::Error::Decode("Missing swap_amount0 for swap event".into()))?;
let amount0 = I256::from_str(&swap_amount0_str).map_err(|e| {
sqlx::Error::Decode(
format!("Invalid swap_amount0 '{swap_amount0_str}': {e}").into(),
)
})?;
let swap_amount1_str = row
.try_get::<Option<String>, _>("swap_amount1")?
.ok_or_else(|| sqlx::Error::Decode("Missing swap_amount1 for swap event".into()))?;
let amount1 = I256::from_str(&swap_amount1_str).map_err(|e| {
sqlx::Error::Decode(
format!("Invalid swap_amount1 '{swap_amount1_str}': {e}").into(),
)
})?;
let pool_swap = PoolSwap::new(
chain,
dex,
instrument_id,
pool_identifier,
block,
transaction_hash,
transaction_index,
log_index,
timestamp, timestamp, sender,
recipient,
amount0,
amount1,
sqrt_price_x96,
swap_liquidity,
swap_tick,
);
Ok(DexPoolData::Swap(pool_swap))
}
"liquidity" => {
let kind_str = row
.try_get::<Option<String>, _>("liquidity_event_type")?
.ok_or_else(|| {
sqlx::Error::Decode("Missing liquidity_event_type for liquidity event".into())
})?;
let kind = match kind_str.as_str() {
"Mint" => PoolLiquidityUpdateType::Mint,
"Burn" => PoolLiquidityUpdateType::Burn,
_ => {
return Err(sqlx::Error::Decode(
format!("Unknown liquidity update type: {kind_str}").into(),
));
}
};
let sender = row
.try_get::<Option<String>, _>("sender")?
.map(|s| validate_address(&s))
.transpose()
.map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
let owner_str = row
.try_get::<Option<String>, _>("owner")?
.ok_or_else(|| sqlx::Error::Decode("Missing owner for liquidity event".into()))?;
let owner = validate_address(&owner_str)
.map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
let position_liquidity_str = row.try_get::<String, _>("position_liquidity")?;
let position_liquidity = position_liquidity_str.parse::<u128>().map_err(|e| {
sqlx::Error::Decode(
format!("Invalid position_liquidity '{position_liquidity_str}': {e}").into(),
)
})?;
let amount0_str = row.try_get::<String, _>("amount0")?;
let amount0 = U256::from_str_radix(&amount0_str, 10).map_err(|e| {
sqlx::Error::Decode(format!("Invalid amount0 '{amount0_str}': {e}").into())
})?;
let amount1_str = row.try_get::<String, _>("amount1")?;
let amount1 = U256::from_str_radix(&amount1_str, 10).map_err(|e| {
sqlx::Error::Decode(format!("Invalid amount1 '{amount1_str}': {e}").into())
})?;
let tick_lower = row
.try_get::<Option<i32>, _>("tick_lower")?
.ok_or_else(|| {
sqlx::Error::Decode("Missing tick_lower for liquidity event".into())
})?;
let tick_upper = row
.try_get::<Option<i32>, _>("tick_upper")?
.ok_or_else(|| {
sqlx::Error::Decode("Missing tick_upper for liquidity event".into())
})?;
let pool_liquidity_update = PoolLiquidityUpdate::new(
chain,
dex,
instrument_id,
pool_identifier,
kind,
block,
transaction_hash,
transaction_index,
log_index,
sender,
owner,
position_liquidity,
amount0,
amount1,
tick_lower,
tick_upper,
timestamp, timestamp, );
Ok(DexPoolData::LiquidityUpdate(pool_liquidity_update))
}
"collect" => {
let owner_str = row
.try_get::<Option<String>, _>("owner")?
.ok_or_else(|| sqlx::Error::Decode("Missing owner for collect event".into()))?;
let owner = validate_address(&owner_str)
.map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
let amount0_str = row.try_get::<String, _>("amount0")?;
let amount0 = amount0_str.parse::<u128>().map_err(|e| {
sqlx::Error::Decode(format!("Invalid amount0 '{amount0_str}': {e}").into())
})?;
let amount1_str = row.try_get::<String, _>("amount1")?;
let amount1 = amount1_str.parse::<u128>().map_err(|e| {
sqlx::Error::Decode(format!("Invalid amount1 '{amount1_str}': {e}").into())
})?;
let tick_lower = row
.try_get::<Option<i32>, _>("tick_lower")?
.ok_or_else(|| {
sqlx::Error::Decode("Missing tick_lower for collect event".into())
})?;
let tick_upper = row
.try_get::<Option<i32>, _>("tick_upper")?
.ok_or_else(|| {
sqlx::Error::Decode("Missing tick_upper for collect event".into())
})?;
let pool_fee_collect = PoolFeeCollect::new(
chain,
dex,
instrument_id,
pool_identifier,
block,
transaction_hash,
transaction_index,
log_index,
owner,
amount0,
amount1,
tick_lower,
tick_upper,
timestamp, timestamp, );
Ok(DexPoolData::FeeCollect(pool_fee_collect))
}
"fee_protocol_update" => {
let fee_protocol0_new = row.try_get::<i16, _>("fee_protocol0_new")?;
let fee_protocol1_new = row.try_get::<i16, _>("fee_protocol1_new")?;
let fee_protocol0_new = u8::try_from(fee_protocol0_new).map_err(|e| {
sqlx::Error::Decode(
format!("Invalid fee_protocol0_new '{fee_protocol0_new}': {e}").into(),
)
})?;
let fee_protocol1_new = u8::try_from(fee_protocol1_new).map_err(|e| {
sqlx::Error::Decode(
format!("Invalid fee_protocol1_new '{fee_protocol1_new}': {e}").into(),
)
})?;
let pool_fee_protocol_update = PoolFeeProtocolUpdate::new(
chain,
dex,
instrument_id,
pool_identifier,
block,
transaction_hash,
transaction_index,
log_index,
fee_protocol0_new,
fee_protocol1_new,
timestamp, timestamp, );
Ok(DexPoolData::FeeProtocolUpdate(pool_fee_protocol_update))
}
"fee_protocol_collect" => {
let sender_str = row.try_get::<Option<String>, _>("sender")?.ok_or_else(|| {
sqlx::Error::Decode("Missing sender for fee_protocol_collect event".into())
})?;
let sender = validate_address(&sender_str)
.map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
let recipient_str =
row.try_get::<Option<String>, _>("recipient")?
.ok_or_else(|| {
sqlx::Error::Decode(
"Missing recipient for fee_protocol_collect event".into(),
)
})?;
let recipient = validate_address(&recipient_str)
.map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
let amount0_str = row.try_get::<String, _>("amount0")?;
let amount0 = amount0_str.parse::<u128>().map_err(|e| {
sqlx::Error::Decode(format!("Invalid amount0 '{amount0_str}': {e}").into())
})?;
let amount1_str = row.try_get::<String, _>("amount1")?;
let amount1 = amount1_str.parse::<u128>().map_err(|e| {
sqlx::Error::Decode(format!("Invalid amount1 '{amount1_str}': {e}").into())
})?;
let pool_fee_protocol_collect = PoolFeeProtocolCollect::new(
chain,
dex,
instrument_id,
pool_identifier,
block,
transaction_hash,
transaction_index,
log_index,
sender,
recipient,
amount0,
amount1,
timestamp, timestamp, );
Ok(DexPoolData::FeeProtocolCollect(pool_fee_protocol_collect))
}
"flash" => {
let sender_str = row
.try_get::<Option<String>, _>("sender")?
.ok_or_else(|| sqlx::Error::Decode("Missing sender for flash event".into()))?;
let sender = validate_address(&sender_str)
.map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
let recipient_str = row
.try_get::<Option<String>, _>("recipient")?
.ok_or_else(|| sqlx::Error::Decode("Missing recipient for flash event".into()))?;
let recipient = validate_address(&recipient_str)
.map_err(|e| sqlx::Error::Decode(e.to_string().into()))?;
let flash_amount0_str = row.try_get::<String, _>("flash_amount0")?;
let amount0 = U256::from_str_radix(&flash_amount0_str, 10).map_err(|e| {
sqlx::Error::Decode(
format!("Invalid flash_amount0 '{flash_amount0_str}': {e}").into(),
)
})?;
let flash_amount1_str = row.try_get::<String, _>("flash_amount1")?;
let amount1 = U256::from_str_radix(&flash_amount1_str, 10).map_err(|e| {
sqlx::Error::Decode(
format!("Invalid flash_amount1 '{flash_amount1_str}': {e}").into(),
)
})?;
let flash_paid0_str = row.try_get::<String, _>("flash_paid0")?;
let paid0 = U256::from_str_radix(&flash_paid0_str, 10).map_err(|e| {
sqlx::Error::Decode(format!("Invalid flash_paid0 '{flash_paid0_str}': {e}").into())
})?;
let flash_paid1_str = row.try_get::<String, _>("flash_paid1")?;
let paid1 = U256::from_str_radix(&flash_paid1_str, 10).map_err(|e| {
sqlx::Error::Decode(format!("Invalid flash_paid1 '{flash_paid1_str}': {e}").into())
})?;
let pool_flash = PoolFlash::new(
chain,
dex,
instrument_id,
pool_identifier,
block,
transaction_hash,
transaction_index,
log_index,
timestamp, timestamp, sender,
recipient,
amount0,
amount1,
paid0,
paid1,
);
Ok(DexPoolData::Flash(pool_flash))
}
_ => Err(sqlx::Error::Decode(
format!("Unknown event type: {event_type}").into(),
)),
}
}
#[cfg(test)]
mod tests {
use nautilus_core::datetime::{
NANOSECONDS_IN_MICROSECOND, NANOSECONDS_IN_MILLISECOND, NANOSECONDS_IN_SECOND,
};
use rstest::rstest;
use super::*;
#[rstest]
#[case("1700000000", 1_700_000_000 * NANOSECONDS_IN_SECOND)]
#[case("9999999999", 9_999_999_999 * NANOSECONDS_IN_SECOND)]
#[case("1700000000123", 1_700_000_000_123 * NANOSECONDS_IN_MILLISECOND)]
#[case("9999999999999", 9_999_999_999_999 * NANOSECONDS_IN_MILLISECOND)]
#[case("1700000000123456", 1_700_000_000_123_456 * NANOSECONDS_IN_MICROSECOND)]
#[case("9999999999999999", 9_999_999_999_999_999 * NANOSECONDS_IN_MICROSECOND)]
#[case("1700000000123456789", 1_700_000_000_123_456_789)]
fn parse_cached_block_timestamp_returns_unix_nanos(#[case] value: &str, #[case] expected: u64) {
let timestamp = parse_cached_block_timestamp(value).unwrap();
assert_eq!(timestamp, UnixNanos::from(expected));
}
#[rstest]
fn parse_cached_block_timestamp_rejects_invalid_text() {
let result = parse_cached_block_timestamp("not-a-timestamp");
assert!(result.is_err());
}
}