use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use futures_util::{SinkExt, StreamExt};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::sync::mpsc;
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
use crate::error::ChainStreamError;
use crate::openapi::types::Resolution;
use super::fields::replace_filter_fields;
use super::models::*;
pub type StreamCallback<T> = Box<dyn Fn(T) + Send + Sync + 'static>;
pub struct Unsubscribe {
channel: String,
callback_id: u64,
api: Arc<StreamApiInner>,
}
impl Unsubscribe {
pub fn unsubscribe(self) {
self.api.unsubscribe(&self.channel, self.callback_id);
}
}
#[derive(Debug, Serialize)]
struct CentrifugeCommand {
id: u64,
#[serde(skip_serializing_if = "Option::is_none")]
connect: Option<ConnectRequest>,
#[serde(skip_serializing_if = "Option::is_none")]
subscribe: Option<SubscribeRequest>,
#[serde(skip_serializing_if = "Option::is_none")]
unsubscribe: Option<UnsubscribeRequest>,
}
#[derive(Debug, Serialize)]
struct ConnectRequest {
token: String,
}
#[derive(Debug, Serialize)]
struct SubscribeRequest {
channel: String,
#[serde(skip_serializing_if = "Option::is_none")]
delta: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
filter: Option<String>,
}
#[derive(Debug, Serialize)]
struct UnsubscribeRequest {
channel: String,
}
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
struct CentrifugeResponse {
#[serde(default)]
id: u64,
#[serde(default)]
connect: Option<Value>,
#[serde(default)]
subscribe: Option<Value>,
#[serde(default)]
push: Option<PushData>,
#[serde(default)]
error: Option<ErrorData>,
}
#[derive(Debug, Deserialize)]
struct PushData {
channel: String,
#[serde(rename = "pub")]
pub_data: Option<PublicationData>,
}
#[derive(Debug, Deserialize)]
struct PublicationData {
data: Value,
}
#[derive(Debug, Deserialize)]
struct ErrorData {
code: i32,
message: String,
}
struct CallbackWrapper {
id: u64,
callback: Box<dyn Fn(Value) + Send + Sync>,
}
struct StreamApiInner {
url: String,
access_token: String,
connected: AtomicBool,
command_id: AtomicU64,
callback_id: AtomicU64,
listeners: RwLock<HashMap<String, Vec<CallbackWrapper>>>,
subscriptions: RwLock<HashMap<String, u64>>,
command_tx: RwLock<Option<mpsc::UnboundedSender<Message>>>,
}
impl StreamApiInner {
fn new(url: String, access_token: String) -> Self {
Self {
url,
access_token,
connected: AtomicBool::new(false),
command_id: AtomicU64::new(1),
callback_id: AtomicU64::new(1),
listeners: RwLock::new(HashMap::new()),
subscriptions: RwLock::new(HashMap::new()),
command_tx: RwLock::new(None),
}
}
fn next_command_id(&self) -> u64 {
self.command_id.fetch_add(1, Ordering::SeqCst)
}
fn next_callback_id(&self) -> u64 {
self.callback_id.fetch_add(1, Ordering::SeqCst)
}
fn add_listener<F>(&self, channel: &str, callback: F) -> u64
where
F: Fn(Value) + Send + Sync + 'static,
{
let callback_id = self.next_callback_id();
let wrapper = CallbackWrapper {
id: callback_id,
callback: Box::new(callback),
};
let mut listeners = self.listeners.write();
listeners
.entry(channel.to_string())
.or_default()
.push(wrapper);
callback_id
}
fn unsubscribe(&self, channel: &str, callback_id: u64) {
let mut listeners = self.listeners.write();
if let Some(callbacks) = listeners.get_mut(channel) {
callbacks.retain(|c| c.id != callback_id);
if callbacks.is_empty() {
listeners.remove(channel);
drop(listeners);
if let Some(tx) = self.command_tx.read().as_ref() {
let cmd = CentrifugeCommand {
id: self.next_command_id(),
connect: None,
subscribe: None,
unsubscribe: Some(UnsubscribeRequest {
channel: channel.to_string(),
}),
};
if let Ok(json) = serde_json::to_string(&cmd) {
let _ = tx.send(Message::Text(json.into()));
}
}
self.subscriptions.write().remove(channel);
log::info!("[streaming] unsubscribed from channel: {}", channel);
}
}
}
fn dispatch_message(&self, channel: &str, data: Value) {
let listeners = self.listeners.read();
if let Some(callbacks) = listeners.get(channel) {
for callback in callbacks {
(callback.callback)(data.clone());
}
}
}
fn send_subscribe(&self, channel: &str, filter: Option<&str>) {
if let Some(tx) = self.command_tx.read().as_ref() {
let cmd = CentrifugeCommand {
id: self.next_command_id(),
connect: None,
subscribe: Some(SubscribeRequest {
channel: channel.to_string(),
delta: Some("fossil".to_string()),
filter: filter.map(|f| f.to_string()),
}),
unsubscribe: None,
};
if let Ok(json) = serde_json::to_string(&cmd) {
let _ = tx.send(Message::Text(json.into()));
}
}
}
}
pub struct StreamApi {
inner: Arc<StreamApiInner>,
}
impl StreamApi {
pub fn new(url: &str, access_token: &str) -> Self {
let url_with_token = if url.contains('?') {
format!("{}&token={}", url, access_token)
} else {
format!("{}?token={}", url, access_token)
};
Self {
inner: Arc::new(StreamApiInner::new(
url_with_token,
access_token.to_string(),
)),
}
}
pub fn is_connected(&self) -> bool {
self.inner.connected.load(Ordering::SeqCst)
}
pub async fn connect(&self) -> Result<(), ChainStreamError> {
if self.is_connected() {
return Ok(());
}
let url = &self.inner.url;
log::info!("[streaming] connecting to {}", url);
let (ws_stream, _) = connect_async(url)
.await
.map_err(|e| ChainStreamError::WebSocket(format!("Failed to connect: {}", e)))?;
let (mut write, mut read) = ws_stream.split();
let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
*self.inner.command_tx.write() = Some(tx.clone());
let connect_cmd = CentrifugeCommand {
id: self.inner.next_command_id(),
connect: Some(ConnectRequest {
token: self.inner.access_token.clone(),
}),
subscribe: None,
unsubscribe: None,
};
let connect_json = serde_json::to_string(&connect_cmd)
.map_err(|e| ChainStreamError::Serialization(e.to_string()))?;
write
.send(Message::Text(connect_json.into()))
.await
.map_err(|e| ChainStreamError::WebSocket(format!("Failed to send connect: {}", e)))?;
self.inner.connected.store(true, Ordering::SeqCst);
let inner_write = self.inner.clone();
tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
if write.send(msg).await.is_err() {
inner_write.connected.store(false, Ordering::SeqCst);
break;
}
}
});
let inner_read = self.inner.clone();
tokio::spawn(async move {
while let Some(msg) = read.next().await {
match msg {
Ok(Message::Text(text)) => {
if let Ok(response) = serde_json::from_str::<CentrifugeResponse>(&text) {
if let Some(push) = response.push {
if let Some(pub_data) = push.pub_data {
inner_read.dispatch_message(&push.channel, pub_data.data);
}
}
if let Some(err) = response.error {
log::error!(
"[streaming] error: code={}, message={}",
err.code,
err.message
);
}
}
}
Ok(Message::Close(_)) => {
log::info!("[streaming] connection closed");
inner_read.connected.store(false, Ordering::SeqCst);
break;
}
Ok(Message::Ping(data)) => {
if let Some(tx) = inner_read.command_tx.read().as_ref() {
let _ = tx.send(Message::Pong(data));
}
}
Err(e) => {
log::error!("[streaming] read error: {}", e);
inner_read.connected.store(false, Ordering::SeqCst);
break;
}
_ => {}
}
}
});
Ok(())
}
pub async fn disconnect(&self) {
if let Some(tx) = self.inner.command_tx.write().take() {
let _ = tx.send(Message::Close(None));
}
self.inner.connected.store(false, Ordering::SeqCst);
log::info!("[streaming] disconnected");
}
pub async fn subscribe<F>(
&self,
channel: &str,
callback: F,
filter: Option<&str>,
method_name: Option<&str>,
) -> Result<Unsubscribe, ChainStreamError>
where
F: Fn(Value) + Send + Sync + 'static,
{
if !self.is_connected() {
self.connect().await?;
}
let processed_filter = match (filter, method_name) {
(Some(f), Some(m)) if !f.is_empty() => Some(replace_filter_fields(f, m)),
(Some(f), _) if !f.is_empty() => Some(f.to_string()),
_ => None,
};
let needs_subscribe = {
let subs = self.inner.subscriptions.read();
!subs.contains_key(channel)
};
let callback_id = self.inner.add_listener(channel, callback);
if needs_subscribe {
self.inner
.send_subscribe(channel, processed_filter.as_deref());
self.inner
.subscriptions
.write()
.insert(channel.to_string(), self.inner.next_command_id());
log::info!("[streaming] subscribed to channel: {}", channel);
}
Ok(Unsubscribe {
channel: channel.to_string(),
callback_id,
api: self.inner.clone(),
})
}
pub async fn subscribe_token_candles<F>(
&self,
chain: &str,
token_address: &str,
resolution: Resolution,
callback: F,
filter: Option<&str>,
price_type: Option<PriceType>,
) -> Result<Unsubscribe, ChainStreamError>
where
F: Fn(Candle) + Send + Sync + 'static,
{
let prefix = match price_type.unwrap_or_default() {
PriceType::Native => "dex-candle-in-native",
PriceType::Usd => "dex-candle",
};
let channel = format!("{}:{}_{}_{}", prefix, chain, token_address, resolution);
self.subscribe(
&channel,
move |data| {
if let Ok(candle) = parse_candle(&data) {
callback(candle);
}
},
filter,
Some("subscribe_token_candles"),
)
.await
}
pub async fn subscribe_pool_candles<F>(
&self,
chain: &str,
pool_address: &str,
resolution: Resolution,
callback: F,
filter: Option<&str>,
price_type: Option<PriceType>,
) -> Result<Unsubscribe, ChainStreamError>
where
F: Fn(Candle) + Send + Sync + 'static,
{
let prefix = match price_type.unwrap_or_default() {
PriceType::Native => "dex-pool-candle-in-native",
PriceType::Usd => "dex-pool-candle",
};
let channel = format!("{}:{}_{}_{}", prefix, chain, pool_address, resolution);
self.subscribe(
&channel,
move |data| {
if let Ok(candle) = parse_candle(&data) {
callback(candle);
}
},
filter,
Some("subscribe_pool_candles"),
)
.await
}
pub async fn subscribe_pair_candles<F>(
&self,
chain: &str,
pair_address: &str,
resolution: Resolution,
callback: F,
filter: Option<&str>,
price_type: Option<PriceType>,
) -> Result<Unsubscribe, ChainStreamError>
where
F: Fn(Candle) + Send + Sync + 'static,
{
let prefix = match price_type.unwrap_or_default() {
PriceType::Native => "dex-pair-candle-in-native",
PriceType::Usd => "dex-pair-candle",
};
let channel = format!("{}:{}_{}_{}", prefix, chain, pair_address, resolution);
self.subscribe(
&channel,
move |data| {
if let Ok(candle) = parse_candle(&data) {
callback(candle);
}
},
filter,
Some("subscribe_pair_candles"),
)
.await
}
pub async fn subscribe_token_stats<F>(
&self,
chain: &str,
token_address: &str,
callback: F,
filter: Option<&str>,
) -> Result<Unsubscribe, ChainStreamError>
where
F: Fn(TokenStat) + Send + Sync + 'static,
{
let channel = format!("dex-token-stats:{}_{}", chain, token_address);
self.subscribe(
&channel,
move |data| {
if let Ok(stat) = serde_json::from_value::<TokenStat>(data) {
callback(stat);
}
},
filter,
Some("subscribe_token_stats"),
)
.await
}
pub async fn subscribe_new_token<F>(
&self,
chain: &str,
callback: F,
filter: Option<&str>,
) -> Result<Unsubscribe, ChainStreamError>
where
F: Fn(NewToken) + Send + Sync + 'static,
{
let channel = format!("dex-new-token:{}", chain);
self.subscribe(
&channel,
move |data| {
if let Ok(token) = parse_new_token(&data) {
callback(token);
}
},
filter,
Some("subscribe_new_token"),
)
.await
}
pub async fn subscribe_token_trade<F>(
&self,
chain: &str,
token_address: &str,
callback: F,
filter: Option<&str>,
) -> Result<Unsubscribe, ChainStreamError>
where
F: Fn(TradeActivity) + Send + Sync + 'static,
{
let channel = format!("dex-trade:{}_{}", chain, token_address);
self.subscribe(
&channel,
move |data| {
if let Ok(trade) = parse_trade_activity(&data) {
callback(trade);
}
},
filter,
Some("subscribe_token_trades"),
)
.await
}
pub async fn subscribe_wallet_balance<F>(
&self,
chain: &str,
wallet_address: &str,
callback: F,
filter: Option<&str>,
) -> Result<Unsubscribe, ChainStreamError>
where
F: Fn(WalletBalance) + Send + Sync + 'static,
{
let channel = format!("dex-wallet-balance:{}_{}", chain, wallet_address);
self.subscribe(
&channel,
move |data| {
if let Ok(balance) = serde_json::from_value::<WalletBalance>(data) {
callback(balance);
}
},
filter,
Some("subscribe_wallet_balance"),
)
.await
}
pub async fn subscribe_token_holders<F>(
&self,
chain: &str,
token_address: &str,
callback: F,
filter: Option<&str>,
) -> Result<Unsubscribe, ChainStreamError>
where
F: Fn(TokenHolder) + Send + Sync + 'static,
{
let channel = format!("dex-token-holder:{}_{}", chain, token_address);
self.subscribe(
&channel,
move |data| {
if let Ok(holder) = serde_json::from_value::<TokenHolder>(data) {
callback(holder);
}
},
filter,
Some("subscribe_token_holders"),
)
.await
}
pub async fn subscribe_token_supply<F>(
&self,
chain: &str,
token_address: &str,
callback: F,
filter: Option<&str>,
) -> Result<Unsubscribe, ChainStreamError>
where
F: Fn(TokenSupply) + Send + Sync + 'static,
{
let channel = format!("dex-token-supply:{}_{}", chain, token_address);
self.subscribe(
&channel,
move |data| {
if let Ok(supply) = serde_json::from_value::<TokenSupply>(data) {
callback(supply);
}
},
filter,
Some("subscribe_token_supply"),
)
.await
}
pub async fn subscribe_dex_pool_balance<F>(
&self,
chain: &str,
pool_address: &str,
callback: F,
filter: Option<&str>,
) -> Result<Unsubscribe, ChainStreamError>
where
F: Fn(DexPoolBalance) + Send + Sync + 'static,
{
let channel = format!("dex-pool-balance:{}_{}", chain, pool_address);
self.subscribe(
&channel,
move |data| {
if let Ok(balance) = serde_json::from_value::<DexPoolBalance>(data) {
callback(balance);
}
},
filter,
Some("subscribe_dex_pool_balance"),
)
.await
}
pub async fn subscribe_token_max_liquidity<F>(
&self,
chain: &str,
token_address: &str,
callback: F,
filter: Option<&str>,
) -> Result<Unsubscribe, ChainStreamError>
where
F: Fn(TokenMaxLiquidity) + Send + Sync + 'static,
{
let channel = format!("dex-token-max-liquidity:{}_{}", chain, token_address);
self.subscribe(
&channel,
move |data| {
if let Ok(liquidity) = serde_json::from_value::<TokenMaxLiquidity>(data) {
callback(liquidity);
}
},
filter,
Some("subscribe_token_max_liquidity"),
)
.await
}
pub async fn subscribe_token_total_liquidity<F>(
&self,
chain: &str,
token_address: &str,
callback: F,
filter: Option<&str>,
) -> Result<Unsubscribe, ChainStreamError>
where
F: Fn(TokenTotalLiquidity) + Send + Sync + 'static,
{
let channel = format!("dex-token-total-liquidity:{}_{}", chain, token_address);
self.subscribe(
&channel,
move |data| {
if let Ok(liquidity) = serde_json::from_value::<TokenTotalLiquidity>(data) {
callback(liquidity);
}
},
filter,
Some("subscribe_token_total_liquidity"),
)
.await
}
pub async fn subscribe_wallet_pnl<F>(
&self,
chain: &str,
wallet_address: &str,
callback: F,
filter: Option<&str>,
) -> Result<Unsubscribe, ChainStreamError>
where
F: Fn(WalletTokenPnl) + Send + Sync + 'static,
{
let channel = format!("dex-wallet-pnl:{}_{}", chain, wallet_address);
self.subscribe(
&channel,
move |data| {
if let Ok(pnl) = serde_json::from_value::<WalletTokenPnl>(data) {
callback(pnl);
}
},
filter,
Some("subscribe_wallet_pnl"),
)
.await
}
pub async fn subscribe_new_tokens_metadata<F>(
&self,
chain: &str,
callback: F,
) -> Result<Unsubscribe, ChainStreamError>
where
F: Fn(Vec<TokenMetadata>) + Send + Sync + 'static,
{
let channel = format!("dex-new-tokens-metadata:{}", chain);
self.subscribe(
&channel,
move |data| {
if let Some(arr) = data.as_array() {
let result: Vec<TokenMetadata> = arr
.iter()
.filter_map(|item| item.as_object().map(parse_token_metadata))
.collect();
callback(result);
}
},
None,
Some("subscribe_new_tokens_metadata"),
)
.await
}
pub async fn subscribe_new_tokens<F>(
&self,
chain: &str,
callback: F,
) -> Result<Unsubscribe, ChainStreamError>
where
F: Fn(Vec<TokenMetadata>) + Send + Sync + 'static,
{
let channel = format!("dex-new-tokens:{}", chain);
self.subscribe(
&channel,
move |data| {
if let Some(arr) = data.as_array() {
let result: Vec<TokenMetadata> = arr
.iter()
.filter_map(|item| item.as_object().map(parse_token_metadata))
.collect();
callback(result);
}
},
None,
Some("subscribe_new_tokens"),
)
.await
}
pub async fn subscribe_ranking_tokens_list<F>(
&self,
chain: &str,
ranking_type: RankingType,
callback: F,
filter: Option<&str>,
) -> Result<Unsubscribe, ChainStreamError>
where
F: Fn(RankingTokenList) + Send + Sync + 'static,
{
let ranking_str = match ranking_type {
RankingType::New => "new",
RankingType::Hot => "trending",
RankingType::Stocks => "stocks",
RankingType::FinalStretch => "completed",
RankingType::Migrated => "graduated",
};
let channel = format!("dex-ranking-token-list:{}_{}", chain, ranking_str);
self.subscribe(
&channel,
move |data| {
if let Ok(ranking) = serde_json::from_value::<RankingTokenList>(data) {
callback(ranking);
}
},
filter,
None,
)
.await
}
}
fn parse_candle(data: &Value) -> Result<Candle, String> {
let obj = data
.as_object()
.ok_or_else(|| "expected object".to_string())?;
Ok(Candle {
address: get_string(obj, "a"),
open: get_string(obj, "o"),
close: get_string(obj, "c"),
high: get_string(obj, "h"),
low: get_string(obj, "l"),
volume: get_string(obj, "v"),
resolution: get_string(obj, "r"),
time: get_i64(obj, "t"),
number: get_i32(obj, "n"),
})
}
fn parse_social_media(obj: &serde_json::Map<String, Value>) -> SocialMedia {
SocialMedia {
twitter: obj
.get("tw")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
telegram: obj
.get("tg")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
website: obj.get("w").and_then(|v| v.as_str()).map(|s| s.to_string()),
tiktok: obj
.get("tt")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
discord: obj
.get("dc")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
facebook: obj
.get("fb")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
github: obj
.get("gh")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
instagram: obj
.get("ig")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
linkedin: obj
.get("li")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
medium: obj
.get("md")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
reddit: obj
.get("rd")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
youtube: obj
.get("yt")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
bitbucket: obj
.get("bb")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
}
}
fn parse_dex_protocol(obj: &serde_json::Map<String, Value>) -> DexProtocol {
DexProtocol {
program_address: obj
.get("pa")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
protocol_family: obj
.get("pf")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
protocol_name: obj
.get("pn")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
}
}
fn parse_token_metadata(obj: &serde_json::Map<String, Value>) -> TokenMetadata {
TokenMetadata {
token_address: get_string(obj, "a"),
name: obj.get("n").and_then(|v| v.as_str()).map(|s| s.to_string()),
decimals: obj.get("dec").and_then(|v| v.as_i64()).map(|v| v as i32),
symbol: obj.get("s").and_then(|v| v.as_str()).map(|s| s.to_string()),
image_url: obj
.get("iu")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
description: obj
.get("de")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
social_media: obj
.get("sm")
.and_then(|v| v.as_object())
.map(parse_social_media),
created_at_ms: obj.get("cts").and_then(|v| v.as_i64()),
coingecko_coin_id: obj
.get("cgi")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
launch_from: obj
.get("lf")
.and_then(|v| v.as_object())
.map(parse_dex_protocol),
migrated_to: obj
.get("mt")
.and_then(|v| v.as_object())
.map(parse_dex_protocol),
}
}
fn parse_new_token(data: &Value) -> Result<NewToken, String> {
let obj = data
.as_object()
.ok_or_else(|| "expected object".to_string())?;
Ok(NewToken {
token_address: get_string(obj, "a"),
name: get_string(obj, "n"),
symbol: get_string(obj, "s"),
decimals: obj.get("dec").and_then(|v| v.as_i64()).map(|v| v as i32),
image_url: obj
.get("iu")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
description: obj
.get("de")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
social_media: obj
.get("sm")
.and_then(|v| v.as_object())
.map(parse_social_media),
coingecko_coin_id: obj
.get("cgi")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
launch_from: obj
.get("lf")
.and_then(|v| v.as_object())
.map(parse_dex_protocol),
migrated_to: obj
.get("mt")
.and_then(|v| v.as_object())
.map(parse_dex_protocol),
created_at_ms: get_i64(obj, "cts"),
})
}
fn parse_trade_activity(data: &Value) -> Result<TradeActivity, String> {
let obj = data
.as_object()
.ok_or_else(|| "expected object".to_string())?;
Ok(TradeActivity {
token_address: get_string(obj, "a"),
timestamp: get_i64(obj, "t"),
kind: get_string(obj, "k"),
buy_amount: get_string(obj, "ba"),
buy_amount_in_usd: get_string(obj, "baiu"),
buy_token_address: get_string(obj, "btma"),
buy_token_name: get_string(obj, "btn"),
buy_token_symbol: get_string(obj, "bts"),
buy_wallet_address: get_string(obj, "bwa"),
sell_amount: get_string(obj, "sa"),
sell_amount_in_usd: get_string(obj, "saiu"),
sell_token_address: get_string(obj, "stma"),
sell_token_name: get_string(obj, "stn"),
sell_token_symbol: get_string(obj, "sts"),
sell_wallet_address: get_string(obj, "swa"),
tx_hash: get_string(obj, "h"),
})
}
fn get_string(obj: &serde_json::Map<String, Value>, key: &str) -> String {
obj.get(key)
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string()
}
fn get_i64(obj: &serde_json::Map<String, Value>, key: &str) -> i64 {
obj.get(key).and_then(|v| v.as_i64()).unwrap_or_default()
}
fn get_i32(obj: &serde_json::Map<String, Value>, key: &str) -> i32 {
obj.get(key)
.and_then(|v| v.as_i64())
.map(|v| v as i32)
.unwrap_or_default()
}