use std::{fmt::format, future::Future};
use anyhow::{Context, Error, Result};
use serde_json::Value;
use tokio_stream::StreamExt as _;
use once_cell::sync::Lazy;
use reqwest::{StatusCode, Url};
use tokio::{
sync::mpsc,
time::{Duration, Instant, sleep_until},
};
use crate::{
types::{PriceFeed, TokenPriceInfo},
utils::apply_exponent,
};
static CLIENT: Lazy<reqwest::Client> = Lazy::new(|| reqwest::Client::new());
pub async fn get_price_feeds() -> Result<Vec<PriceFeed>> {
let resp = CLIENT
.get("https://hermes.pyth.network/v2/price_feeds?asset_type=crypto")
.send()
.await
.context("Failed to send request to Pyth Hermes API")?;
let status = resp.status();
if status != StatusCode::OK {
return Err(anyhow::anyhow!("Unexpected status code: {}", status));
}
let body = resp
.text()
.await
.context("Failed to read response body from Hermes API")?;
let price_feeds: Vec<PriceFeed> = serde_json::from_str(&body)
.context("Failed to deserialize response into Vec<PriceFeed>")?;
Ok(price_feeds)
}
pub async fn get_live_price_stream<F, Fut>(ids: &Vec<String>, call_back: F) -> Result<String>
where
F: Fn(Vec<TokenPriceInfo>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send,
{
let url = Url::parse_with_params(
"https://hermes.pyth.network/v2/updates/price/stream",
ids.iter().map(|id| ("ids[]", id.as_str())),
)
.map_err(|e| e)?;
let res = CLIENT.get(url).send().await.map_err(|e| e)?;
let stream = res.bytes_stream();
tokio::pin!(stream);
let mut buffer = String::new();
while let Some(chunk_result) = futures::StreamExt::next(&mut stream).await {
match chunk_result {
Ok(bytes) => {
if let Ok(text) = std::str::from_utf8(&bytes) {
buffer.push_str(text);
while let Some(pos) = buffer.find('\n') {
let line = buffer[..pos].trim().to_string();
buffer = buffer[pos + 1..].to_string();
if line.starts_with("data:") {
let json_str = line[5..].trim();
match serde_json::from_str::<serde_json::Value>(json_str) {
Ok(parsed) => {
let data = parsed["parsed"].as_array().unwrap();
let result = parse_token_data(data).await.map_err(|e| e)?;
call_back(result).await;
}
Err(e) => {
eprintln!("❌ JSON parse error: {}", e);
eprintln!("⛔ Raw data: {}", json_str);
}
}
}
}
}
}
Err(e) => {
eprintln!("❌ Stream error: {}", e);
break;
}
}
}
Ok("Stream reading".to_string())
}
pub async fn get_token_price_info(ids: &Vec<String>) -> Result<Vec<TokenPriceInfo>> {
let url = format!(
"https://hermes.pyth.network/v2/updates/price/latest?{}",
ids.iter()
.map(|id| format!("ids[]={}", id))
.collect::<Vec<_>>()
.join("&")
);
let resp = CLIENT
.get(&url)
.send()
.await
.context("Failed to send price fetch request")?;
let body = resp
.text()
.await
.context("Failed to read latest price response body")?;
let parsed: serde_json::Value =
serde_json::from_str(&body).context("Failed to parse latest price response as JSON")?;
let data = parsed["parsed"]
.as_array()
.context("Expected `parsed` field to be an array")?;
let result = parse_token_data(data).await.map_err(|e| e)?;
Ok(result)
}
pub async fn parse_token_data(data: &Vec<Value>) -> Result<Vec<TokenPriceInfo>> {
let tokens = get_price_feeds().await?;
data.iter()
.map(|entry| {
let id = entry["id"]
.as_str()
.context("Missing `id` in price update")?
.to_string();
let price_detail = &entry["price"];
let ema_detail = &entry["ema_price"];
let price_str = price_detail["price"]
.as_str()
.context("Missing price string")?;
let price_expo = price_detail["expo"]
.as_i64()
.context("Missing price expo")?;
let ema_str = ema_detail["price"].as_str().context("Missing ema string")?;
let ema_expo = ema_detail["expo"].as_i64().context("Missing ema expo")?;
let price = apply_exponent(price_str, price_expo as i32);
let ema = apply_exponent(ema_str, ema_expo as i32);
let fluctuation = if ema != 0.0 {
((price - ema) / ema) * 100.0
} else {
0.0
};
let token = tokens.iter().find(|t| t.id == id);
Ok(TokenPriceInfo {
name: token
.map(|t| t.attributes.description.clone())
.unwrap_or_else(|| "Unknown".into()),
token_id: id.clone(),
token_symbol: token
.and_then(|t| {
t.attributes
.symbol
.split('.')
.nth(1)?
.split('/')
.next()
.map(str::to_string)
})
.unwrap_or_else(|| "Unknown".into()),
price_30s: price,
price_1m: ema,
timestamp: price_detail["publish_time"].as_i64().unwrap_or_default(), fluctuation_pct: fluctuation,
})
})
.collect::<Result<Vec<_>>>()
}
pub async fn get_price_stream_for_duration(
ids: Vec<String>,
duration_secs: u64,
tx: mpsc::Sender<TokenPriceInfo>,
) -> Result<(), Error> {
println!("hereee");
let url = Url::parse_with_params(
"https://hermes.pyth.network/v2/updates/price/stream",
ids.iter().map(|id| ("ids[]", id.as_str())),
)
.map_err(|e| e)?;
let res = CLIENT.get(url).send().await.map_err(|e| e)?;
let stream = res.bytes_stream();
tokio::pin!(stream);
let deadline = Instant::now() + Duration::from_secs(duration_secs);
let mut buffer = String::new();
let all_prices: Vec<TokenPriceInfo> = vec![];
loop {
tokio::select! {
biased;
_ = sleep_until(deadline) => {
println!("⏱️ Stream time limit reached");
break;
}
maybe_chunk = stream.next() => {
match maybe_chunk {
Some(Ok(bytes)) => {
if let Ok(text) = std::str::from_utf8(&bytes) {
buffer.push_str(text);
while let Some(pos) = buffer.find('\n') {
let line = buffer[..pos].trim().to_string();
buffer = buffer[pos + 1..].to_string();
if line.starts_with("data:") {
let json_str = line[5..].trim();
match serde_json::from_str::<serde_json::Value>(json_str) {
Ok(parsed) => {
if let Some(data) = parsed["parsed"].as_array() {
let prices = parse_token_data(data).await.map_err(|e| {
e
})?;
for price in prices {
if let Err(e) = tx.send(price).await {
eprintln!("❌ Failed to send price update: {}", e);
break;
}
}
}
}
Err(e) => {
eprintln!("❌ JSON parse error: {}", e);
eprintln!("⛔ Raw data: {}", json_str);
}
}
}
}
}
}
Some(Err(e)) => {
eprintln!("❌ Stream error: {}", e);
break;
}
None => break,
}
}
}
}
Ok(())
}
pub async fn search_by_token_symbols(symbols: Vec<&str>) -> Result<Vec<TokenPriceInfo>> {
let price_feeds = get_price_feeds().await?;
let symbols = symbols
.iter()
.map(|symbol| symbol.to_uppercase())
.collect::<Vec<String>>();
let feeds: Vec<PriceFeed> = symbols
.iter()
.filter_map(|symbol| {
let target = format!("{}/USD", symbol);
price_feeds.iter().find(|feed| {
feed.attributes.base == *symbol && target == feed.attributes.display_symbol
})
})
.cloned()
.collect();
let feed_ids = feeds
.iter()
.map(|feed| feed.id.clone())
.collect::<Vec<String>>();
let price_info = get_token_price_info(&feed_ids)
.await
.context("Problem getting search results")?;
Ok(price_info)
}