use std::{
collections::{HashMap, HashSet},
fmt,
};
use eventsource_stream::Eventsource;
use futures_util::{Stream, TryStreamExt};
use gmsol_utils::{
market::HasMarketMeta,
oracle::{pyth_price_with_confidence_to_price, PriceProviderKind},
token_config::TokenMapAccess,
};
use reqwest::{Client, IntoUrl, Url};
pub use pyth_sdk::Identifier;
use crate::client::pyth::pubkey_to_identifier;
pub const DEFAULT_HERMES_BASE: &str = "https://hermes.pyth.network";
pub const PRICE_STREAM: &str = "/v2/updates/price/stream";
pub const PRICE_LATEST: &str = "/v2/updates/price/latest";
#[derive(Debug, Clone)]
pub struct Hermes {
base: Url,
client: Client,
}
impl Hermes {
pub fn try_new(base: impl IntoUrl) -> crate::Result<Self> {
Ok(Self {
base: base.into_url()?,
client: Client::new(),
})
}
pub async fn price_updates(
&self,
feed_ids: impl IntoIterator<Item = &Identifier>,
encoding: Option<EncodingType>,
) -> crate::Result<impl Stream<Item = crate::Result<PriceUpdate>> + 'static> {
let params = get_query(feed_ids, encoding);
let stream = self
.client
.get(self.base.join(PRICE_STREAM).map_err(crate::Error::custom)?)
.query(¶ms)
.send()
.await?
.bytes_stream()
.eventsource()
.map_err(crate::Error::custom)
.try_filter_map(|event| {
let update = deserialize_price_update_event(&event)
.inspect_err(
|err| tracing::warn!(%err, ?event, "deserialize price update error"),
)
.ok();
async { Ok(update) }
});
Ok(stream)
}
pub async fn latest_price_updates(
&self,
feed_ids: impl IntoIterator<Item = &Identifier>,
encoding: Option<EncodingType>,
) -> crate::Result<PriceUpdate> {
let params = get_query(feed_ids, encoding);
let update = self
.client
.get(self.base.join(PRICE_LATEST).map_err(crate::Error::custom)?)
.query(¶ms)
.send()
.await?
.json()
.await?;
Ok(update)
}
pub async fn unit_prices_for_market(
&self,
token_map: &impl TokenMapAccess,
market: &impl HasMarketMeta,
) -> crate::Result<gmsol_model::price::Prices<u128>> {
let token_configs =
token_map
.token_configs_for_market(market)
.ok_or(crate::Error::custom(
"missing configs for the tokens of the market",
))?;
let feeds = token_configs
.iter()
.map(|config| {
config
.get_feed(&PriceProviderKind::Pyth)
.map(|feed| pubkey_to_identifier(&feed))
})
.collect::<Result<Vec<_>, _>>()
.map_err(crate::Error::custom)?;
let update = self
.latest_price_updates(feeds.iter().collect::<HashSet<_>>(), None)
.await?;
let prices = update
.parsed
.iter()
.map(|price| {
Ok((
Identifier::from_hex(price.id()).map_err(crate::Error::custom)?,
&price.price,
))
})
.collect::<crate::Result<HashMap<Identifier, _>>>()?;
let [index_token_price, long_token_price, short_token_price] = feeds
.iter()
.enumerate()
.map(|(idx, feed)| {
let config = token_configs[idx];
let price = prices
.get(feed)
.ok_or(crate::Error::custom(format!("missing price for {feed}")))?;
let price = pyth_price_with_confidence_to_price(
price.price,
price.conf,
price.expo,
config,
)
.map_err(crate::Error::custom)?;
Ok(gmsol_model::price::Price {
min: price.min.to_unit_price(),
max: price.max.to_unit_price(),
})
})
.collect::<crate::Result<Vec<_>>>()?
.try_into()
.expect("must success");
Ok(gmsol_model::price::Prices {
index_token_price,
long_token_price,
short_token_price,
})
}
}
impl Default for Hermes {
fn default() -> Self {
Self {
base: DEFAULT_HERMES_BASE.parse().unwrap(),
client: Default::default(),
}
}
}
fn deserialize_price_update_event(event: &eventsource_stream::Event) -> crate::Result<PriceUpdate> {
Ok(serde_json::from_str(&event.data)?)
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct PriceUpdate {
pub(crate) binary: BinaryPriceUpdate,
#[serde(default)]
parsed: Vec<ParsedPriceUpdate>,
}
impl PriceUpdate {
pub fn parsed(&self) -> &[ParsedPriceUpdate] {
&self.parsed
}
pub fn min_timestamp(&self) -> Option<i64> {
self.parsed
.iter()
.map(|update| update.price.publish_time)
.min()
}
pub fn binary(&self) -> &BinaryPriceUpdate {
&self.binary
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct BinaryPriceUpdate {
pub(crate) encoding: EncodingType,
pub(crate) data: Vec<String>,
}
#[derive(Clone, Copy, Debug, Default, serde::Deserialize, serde::Serialize)]
pub enum EncodingType {
#[default]
#[serde(rename = "hex")]
Hex,
#[serde(rename = "base64")]
Base64,
}
impl fmt::Display for EncodingType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Hex => write!(f, "hex"),
Self::Base64 => write!(f, "base64"),
}
}
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct ParsedPriceUpdate {
id: String,
price: Price,
ema_price: Price,
metadata: Metadata,
}
impl ParsedPriceUpdate {
pub fn id(&self) -> &str {
self.id.as_str()
}
pub fn price(&self) -> &Price {
&self.price
}
pub fn ema_price(&self) -> &Price {
&self.ema_price
}
pub fn metadata(&self) -> &Metadata {
&self.metadata
}
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Price {
#[serde(with = "pyth_sdk::utils::as_string")]
price: i64,
#[serde(with = "pyth_sdk::utils::as_string")]
conf: u64,
expo: i32,
publish_time: i64,
}
impl Price {
pub fn price(&self) -> i64 {
self.price
}
pub fn conf(&self) -> u64 {
self.conf
}
pub fn expo(&self) -> i32 {
self.expo
}
pub fn publish_time(&self) -> i64 {
self.publish_time
}
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Metadata {
slot: Option<u64>,
proof_available_time: Option<i64>,
prev_publish_time: Option<i64>,
}
impl Metadata {
pub fn slot(&self) -> Option<u64> {
self.slot
}
pub fn proof_available_time(&self) -> Option<i64> {
self.proof_available_time
}
pub fn prev_publish_time(&self) -> Option<i64> {
self.prev_publish_time
}
}
fn get_query<'a>(
feed_ids: impl IntoIterator<Item = &'a Identifier>,
encoding: Option<EncodingType>,
) -> Vec<(&'static str, String)> {
let encoding = encoding.or(Some(EncodingType::Base64));
feed_ids
.into_iter()
.map(|id| ("ids[]", id.to_hex()))
.chain(encoding.map(|encoding| ("encoding", encoding.to_string())))
.collect()
}