simple_pyth_client_rs 0.1.0

Rust wrapper for Pyth Hermes crypto price feeds
Documentation
use std::{fmt::format, future::Future};

// use futures::StreamExt;

use anyhow::{Context, Error, Result};
use serde_json::Value;
use tokio_stream::StreamExt as _;

use once_cell::sync::Lazy;
use reqwest::{StatusCode, Url};

use tokio::{
    sync::mpsc,
    time::{Duration, Instant, sleep_until},
};

use crate::{
    types::{PriceFeed, TokenPriceInfo},
    utils::apply_exponent,
};

static CLIENT: Lazy<reqwest::Client> = Lazy::new(|| reqwest::Client::new());

// static TOKENS: Lazy<Vec<&str>> = Lazy::new(|| {
//     vec![
//         "23d7315113f5b1d3ba7a83604c44b94d79f4fd69af77f804fc7f920a6dc65744", //SUI
//         "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43", // BTC
//         "ff61491a931112ddf1bd8147cd1b641375f79f5825126d665480874634fd0ace", // ETH
//         "ef0d8b6fda2ceba41da15d4095d1da392a0d2f8ed0c6c7bc0f4cfac8c280b56d", // SOL
//         "2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b", // USDT
//         "eaa020c61cc479712813461ce153894a96a6c00b21ed0cfc2798d1f9a9e9c94a", // USDC
//         "b0948a5e5313200c632b51bb5ca32f6de0d36e9950a942d19751e833f70dabfd", // DAI
//         "433faaa801ecdb6618e3897177a118b273a8e18cc3ff545aadfc207d58d028f7", // TUSD
//         "ef94acc2fb09eb976c6eb3000bab898cab891d5b800702cd1dc88e61d7c3c5e6", // USTC
//         "cd2cee36951a571e035db0dfad138e6ecdb06b517cc3373cd7db5d3609b7927c", // MEME
//         "f0d57deca57b3da2fe63a493f4c25925fdfd8edf834b20f93e1f84dbd1504d4a", // SHIB
//         "dcef50dd0a4cd2dcc17e45df1676dcb336a11a61c69df7a0299b0150c672d25c", // DOGE
//         "15add95022ae13563a11992e727c91bdb6b55bc183d9d747436c80a483d8c864", // APE
//         "bed3097008b9b5e3c93bec20be79cb43986b85a996475589351a21e67bae9b61", // PENGU
//         "63a45218d6b13ffd28ca04748615511bf70eff80a3411c97d96b8ed74a6decab", // MICHI
//         "62742a997d01f7524f791fdb2dd43aaf0e567d765ebf8fd0406a994239e874d4", // MOTHER
//         "9b5729efe3d68e537cdcb2ca70444dea5f06e1660b562632609757076d0b9448", // BRETT
//         "9c93e4a22c56885af427ac4277437e756e7ec403fbc892f975d497383bb33560", // DEGEN
//         "6b1381ce7e874dc5410b197ac8348162c0dd6c0d4c9cd6322672d6c2b1d58293", // FLOKI
//         "c80657b7f6f3eac27218d09d5a4e54e47b25768d9f5e10ac15fe2cf900881400", // FIDA (DAO-ish)
//         "c415de8d2eba7db216527dff4b60e8f3a5311c740dadb233e13e12547e226750", // NEAR
//         "b27578a9654246cb0a2950842b92330e9ace141c52b63829cc72d5c45a5a595a", // ETHFI
//         "0caec284d34d836ca325cf7b3256c078c597bc052fbd3c0283d52b581d68d71f", // RSETH (DAO / LSD)
//         "a0255134973f4fdf2f8f7808354274a3b1ebc6ee438be898d045e8b56ba1fe13", // RETH (Rocket Pool - RWA staking)
//         "846ae1bdb6300b817cee5fdee2a6da192775030db5615b94a465f53bd40850b5", // STETH (Lido)
//         "b7e3904c08ddd9c0c10c6d207d390fd19e87eb6aab96304f571ed94caebdefa0", // AXS (Gaming)
//         "baa284eaf23edf975b371ba2818772f93dbae72836bbdea28b07d40f3cf8b485", // GMT (Gaming)
//         "681e0eb7acf9a2a3384927684d932560fb6f67c6beb21baa0f110e993b265386", // ATLAS (Gaming)
//         "b17e5bc5de742a8a378b54c9c75442b7d51e30ada63f28d9bd28d3c0e26511a0", // KMNO (RWA)
//         "70cd05521e3bdeaee2cadc1360f0d95397f03275f273199be35a029114f53a3b", // BIFI (RWA/DeFi infra)
//         "3fa4252848f9f0a1480be62745a4629d9eb1322aebab8a791e344b3b9c1adcf5", // ARB (DAO)
//         "8ac0c70fff57e9aefdf5edf44b51d62c2d433653cbb2cf5cc06bb115af04d221", // LINK (Oracle infra)
//         "b00b60f88b03a6a625a8d1c048c3f66653edf217439983d037e7222c4e612819", // ATOM (DAO / Layer 0)
//         "c2289a6a43d2ce91c6f55caec370f4acc38a2ed477f58813334c6d03749ff2a4", // MSOL (LSD - DAO)
//         "2b9ab1e972a281585084148ba1389800799bd4be63b957507db1349314e47445", // AAVE (DAO)
//         "93da3352f9f1d105fdfe4971cfa80e9dd777bfc5d0f683ebb6e1294b92137bb7", // AVAX
//         "c7b72e5d860034288c9335d4d325da4272fe50c92ab72249d58f6cbba30e4c44", // IOTA
//         "3dd2b63686a450ec7290df3a1e0b583c0481f651351edfa7636f39aed55cf8a3", // BCH
//         "6e3f3fa8253588df9326580180233eb791e03b443a3ba7a1d892e73874e19a54", // LTC
//         "78d185a741d07edb3412b09008b7c5cfb9bbbd7d568bf00ba737b456ba171501", // UNI
//         "b82449fd728133488d2d41131cffe763f9c1693b73c544d9ef6aaa371060dd25", // WOO
//         "6489800bb8974169adfe35937bf6736507097d13c190d760c557108c7e93a81b", // DYDX
//         "385f64d993f7b77d8182ed5003d97c60aa3361f3cecfe711544d2d59165e9bdf", // OP
//         "c65db025687356496e8653d0d6608eec64ce2d96e2e28c530e574f0e4f712380", // EIGEN (staking/RWA infra)
//         "c8acad81438490d4ebcac23b3e93f31cdbcb893fcba746ea1c66b89684faae2f", // KCS (Exchange DAO)
//         "ccca1d2b0d9a9ca72aa2c849329520a378aea0ec7ef14497e67da4050d6cf578", // ALICE (Gaming)
//         "03e8dbf3e8f02edf5ca898dc7afbbac3f06c7d91c02986c3a8c6ce1a99e90355", // MERL (DeFi infra)
//         "e6ccd3f878cf338e6732bf59f60943e8ca2c28402fc4d9c258503b2edbe74a31", // LUNA (RWA fallout)
//         "4456d442a152fd1f972b18459263ef467d3c29fb9d667e30c463b086691fbc79", // LUNC (RWA fallout)
//     ]
// });
// notes: this returns a list of pricefeeds for crypto pairs
pub async fn get_price_feeds() -> Result<Vec<PriceFeed>> {
    let resp = CLIENT
        .get("https://hermes.pyth.network/v2/price_feeds?asset_type=crypto")
        .send()
        .await
        .context("Failed to send request to Pyth Hermes API")?;

    let status = resp.status();
    if status != StatusCode::OK {
        return Err(anyhow::anyhow!("Unexpected status code: {}", status));
    }

    let body = resp
        .text()
        .await
        .context("Failed to read response body from Hermes API")?;

    let price_feeds: Vec<PriceFeed> = serde_json::from_str(&body)
        .context("Failed to deserialize response into Vec<PriceFeed>")?;

    // let filtered = price_feeds
    //     .into_iter()
    //     .filter(|feed| TOKENS.contains(&feed.id.as_str()))
    //     .collect();

    Ok(price_feeds)
}

// todo: remove this later
pub async fn get_live_price_stream<F, Fut>(ids: &Vec<String>, call_back: F) -> Result<String>
where
    F: Fn(Vec<TokenPriceInfo>) -> Fut + Send + Sync + 'static,
    Fut: Future<Output = ()> + Send,
{
    let url = Url::parse_with_params(
        "https://hermes.pyth.network/v2/updates/price/stream",
        ids.iter().map(|id| ("ids[]", id.as_str())),
    )
    .map_err(|e| e)?;

    let res = CLIENT.get(url).send().await.map_err(|e| e)?;

    let stream = res.bytes_stream();
    tokio::pin!(stream);

    // Buffer for incomplete chunks
    let mut buffer = String::new();

    while let Some(chunk_result) = futures::StreamExt::next(&mut stream).await {
        match chunk_result {
            Ok(bytes) => {
                if let Ok(text) = std::str::from_utf8(&bytes) {
                    buffer.push_str(text);

                    while let Some(pos) = buffer.find('\n') {
                        let line = buffer[..pos].trim().to_string();
                        buffer = buffer[pos + 1..].to_string();

                        if line.starts_with("data:") {
                            let json_str = line[5..].trim();

                            match serde_json::from_str::<serde_json::Value>(json_str) {
                                Ok(parsed) => {
                                    let data = parsed["parsed"].as_array().unwrap();

                                    let result = parse_token_data(data).await.map_err(|e| e)?;

                                    call_back(result).await;
                                }
                                Err(e) => {
                                    eprintln!("❌ JSON parse error: {}", e);
                                    eprintln!("⛔ Raw data: {}", json_str);
                                }
                            }
                        }
                    }
                }
            }
            Err(e) => {
                eprintln!("❌ Stream error: {}", e);
                break;
            }
        }
    }

    Ok("Stream reading".to_string())
}

pub async fn get_token_price_info(ids: &Vec<String>) -> Result<Vec<TokenPriceInfo>> {
    let url = format!(
        "https://hermes.pyth.network/v2/updates/price/latest?{}",
        ids.iter()
            .map(|id| format!("ids[]={}", id))
            .collect::<Vec<_>>()
            .join("&")
    );

    let resp = CLIENT
        .get(&url)
        .send()
        .await
        .context("Failed to send price fetch request")?;

    let body = resp
        .text()
        .await
        .context("Failed to read latest price response body")?;

    let parsed: serde_json::Value =
        serde_json::from_str(&body).context("Failed to parse latest price response as JSON")?;
    let data = parsed["parsed"]
        .as_array()
        .context("Expected `parsed` field to be an array")?;

    let result = parse_token_data(data).await.map_err(|e| e)?;

    Ok(result)
}

pub async fn parse_token_data(data: &Vec<Value>) -> Result<Vec<TokenPriceInfo>> {
    let tokens = get_price_feeds().await?;

    data.iter()
        .map(|entry| {
            let id = entry["id"]
                .as_str()
                .context("Missing `id` in price update")?
                .to_string();

            let price_detail = &entry["price"];
            let ema_detail = &entry["ema_price"];

            let price_str = price_detail["price"]
                .as_str()
                .context("Missing price string")?;
            let price_expo = price_detail["expo"]
                .as_i64()
                .context("Missing price expo")?;

            let ema_str = ema_detail["price"].as_str().context("Missing ema string")?;
            let ema_expo = ema_detail["expo"].as_i64().context("Missing ema expo")?;

            let price = apply_exponent(price_str, price_expo as i32);
            let ema = apply_exponent(ema_str, ema_expo as i32);
            let fluctuation = if ema != 0.0 {
                ((price - ema) / ema) * 100.0
            } else {
                0.0
            };

            let token = tokens.iter().find(|t| t.id == id);
            Ok(TokenPriceInfo {
                name: token
                    .map(|t| t.attributes.description.clone())
                    .unwrap_or_else(|| "Unknown".into()),
                token_id: id.clone(),
                token_symbol: token
                    .and_then(|t| {
                        t.attributes
                            .symbol
                            .split('.')
                            .nth(1)?
                            .split('/')
                            .next()
                            .map(str::to_string)
                    })
                    .unwrap_or_else(|| "Unknown".into()),
                price_30s: price,
                price_1m: ema,
                timestamp: price_detail["publish_time"].as_i64().unwrap_or_default(), // fallback to 0
                fluctuation_pct: fluctuation,
            })
        })
        .collect::<Result<Vec<_>>>()
}

pub async fn get_price_stream_for_duration(
    ids: Vec<String>,
    duration_secs: u64,
    tx: mpsc::Sender<TokenPriceInfo>,
) -> Result<(), Error> {
    println!("hereee");
    let url = Url::parse_with_params(
        "https://hermes.pyth.network/v2/updates/price/stream",
        ids.iter().map(|id| ("ids[]", id.as_str())),
    )
    .map_err(|e| e)?;

    let res = CLIENT.get(url).send().await.map_err(|e| e)?;

    let stream = res.bytes_stream();
    tokio::pin!(stream);

    let deadline = Instant::now() + Duration::from_secs(duration_secs);
    let mut buffer = String::new();
    let all_prices: Vec<TokenPriceInfo> = vec![];

    loop {
        tokio::select! {
            biased;

            _ = sleep_until(deadline) => {
                println!("⏱️ Stream time limit reached");

                break;
            }

            maybe_chunk = stream.next() => {
                match maybe_chunk {
                    Some(Ok(bytes)) => {
                        if let Ok(text) = std::str::from_utf8(&bytes) {
                            buffer.push_str(text);
                            while let Some(pos) = buffer.find('\n') {
                                let line = buffer[..pos].trim().to_string();
                                buffer = buffer[pos + 1..].to_string();

                                if line.starts_with("data:") {
                                    let json_str = line[5..].trim();

                                    match serde_json::from_str::<serde_json::Value>(json_str) {
                                        Ok(parsed) => {
                                            if let Some(data) = parsed["parsed"].as_array() {
                                                let prices = parse_token_data(data).await.map_err(|e| {
                                                    e
                                                })?;

                                                for price in prices {
                                                    // 🔥 Send each price immediately
                                                    if let Err(e) = tx.send(price).await {
                                                        eprintln!("❌ Failed to send price update: {}", e);
                                                        break;
                                                    }
                                                }
                                            }
                                            // if let Some(data) = parsed["parsed"].as_array() {
                                            //     let prices = parse_token_data(data).await.map_err(|e| {
                                            //         e
                                            //     })?;
                                            //     all_prices.extend(prices);
                                            // }
                                        }
                                        Err(e) => {
                                            eprintln!("❌ JSON parse error: {}", e);
                                            eprintln!("⛔ Raw data: {}", json_str);
                                        }
                                    }
                                }
                            }
                        }
                    }
                    Some(Err(e)) => {
                        eprintln!("❌ Stream error: {}", e);
                        break;
                    }
                    None => break,
                }
            }
        }
    }

    // println!("✅ Allll fetched: {:?}", all_prices);
    Ok(())
    // Ok(all_prices)
}

pub async fn search_by_token_symbols(symbols: Vec<&str>) -> Result<Vec<TokenPriceInfo>> {
    let price_feeds = get_price_feeds().await?;
    let symbols = symbols
        .iter()
        .map(|symbol| symbol.to_uppercase())
        .collect::<Vec<String>>();

    let feeds: Vec<PriceFeed> = symbols
        .iter()
        .filter_map(|symbol| {
            let target = format!("{}/USD", symbol);
            price_feeds.iter().find(|feed| {
                feed.attributes.base == *symbol && target == feed.attributes.display_symbol
            })
        })
        .cloned()
        .collect();

    let feed_ids = feeds
        .iter()
        .map(|feed| feed.id.clone())
        .collect::<Vec<String>>();
    let price_info = get_token_price_info(&feed_ids)
        .await
        .context("Problem getting search results")?;

    Ok(price_info)
}