use std::{
collections::{hash_map::Entry, HashMap},
ops::Deref,
};
use either::Either;
use gmsol_solana_utils::bundle_builder::BundleOptions;
use gmsol_utils::oracle::PriceProviderKind;
use pythnet_sdk::wire::v1::AccumulatorUpdateData;
use solana_sdk::{pubkey::Pubkey, signature::Keypair, signer::Signer};
use time::OffsetDateTime;
use crate::client::{
feeds_parser::FeedAddressMap,
pull_oracle::{FeedIds, PostPullOraclePrices, PriceUpdateInstructions, PullOracle},
};
use super::{
hermes::{BinaryPriceUpdate, EncodingType, Hermes},
utils, PythPullOracle, PythReceiverOps, WormholeOps, VAA_SPLIT_INDEX,
};
pub struct PythPullOracleWithHermes<'a, C> {
gmsol: &'a crate::Client<C>,
hermes: &'a Hermes,
oracle: &'a PythPullOracle<C>,
}
pub struct PriceUpdates {
num_feeds: Option<usize>,
updates: Vec<BinaryPriceUpdate>,
}
impl From<Vec<BinaryPriceUpdate>> for PriceUpdates {
fn from(value: Vec<BinaryPriceUpdate>) -> Self {
Self {
num_feeds: None,
updates: value,
}
}
}
impl<'a, C> PythPullOracleWithHermes<'a, C> {
pub fn from_parts(
gmsol: &'a crate::Client<C>,
hermes: &'a Hermes,
oracle: &'a PythPullOracle<C>,
) -> Self {
Self {
gmsol,
hermes,
oracle,
}
}
}
impl<C> PullOracle for PythPullOracleWithHermes<'_, C> {
type PriceUpdates = PriceUpdates;
async fn fetch_price_updates(
&self,
feed_ids: &FeedIds,
after: Option<OffsetDateTime>,
) -> crate::Result<Self::PriceUpdates> {
let feed_ids = utils::extract_pyth_feed_ids(feed_ids)?;
if feed_ids.is_empty() {
return Ok(PriceUpdates {
num_feeds: Some(0),
updates: vec![],
});
}
let update = self
.hermes
.latest_price_updates(&feed_ids, Some(EncodingType::Base64))
.await?;
if let Some(after) = after {
let min_ts = update
.min_timestamp()
.ok_or_else(|| crate::Error::custom("empty price updates"))?;
let min_ts =
OffsetDateTime::from_unix_timestamp(min_ts).map_err(crate::Error::custom)?;
if min_ts < after {
return Err(crate::Error::custom(format!(
"price updates are too old, min_ts={min_ts}, required={after}"
)));
}
}
Ok(PriceUpdates {
num_feeds: Some(feed_ids.len()),
updates: vec![update.binary],
})
}
}
impl<C> PullOracle for &PythPullOracleWithHermes<'_, C> {
type PriceUpdates = PriceUpdates;
async fn fetch_price_updates(
&self,
feed_ids: &FeedIds,
after: Option<OffsetDateTime>,
) -> crate::Result<Self::PriceUpdates> {
(*self).fetch_price_updates(feed_ids, after).await
}
}
impl<'a, C: Deref<Target = impl Signer> + Clone> PostPullOraclePrices<'a, C>
for PythPullOracleWithHermes<'a, C>
{
async fn fetch_price_update_instructions(
&self,
price_updates: &Self::PriceUpdates,
options: BundleOptions,
) -> crate::Result<(
PriceUpdateInstructions<'a, C>,
HashMap<PriceProviderKind, FeedAddressMap>,
)> {
let mut ixns = PriceUpdateInstructions::new(self.gmsol, options);
let PriceUpdates { updates, num_feeds } = price_updates;
if updates.is_empty() {
return Ok((ixns, Default::default()));
}
let mut prices = HashMap::with_capacity(num_feeds.unwrap_or(0));
let wormhole = &self.oracle.wormhole;
let pyth = &self.oracle.pyth;
let datas = updates
.iter()
.flat_map(
|update| match utils::parse_accumulator_update_datas(update) {
Ok(datas) => Either::Left(datas.into_iter().map(Ok)),
Err(err) => Either::Right(std::iter::once(Err(err))),
},
)
.collect::<crate::Result<Vec<AccumulatorUpdateData>>>()?;
let mut updates = HashMap::<_, _>::default();
for data in datas.iter() {
let proof = &data.proof;
for update in utils::get_merkle_price_updates(proof) {
let feed_id = utils::parse_feed_id(update)?;
updates.insert(feed_id, (proof, update));
}
}
let mut encoded_vaas = HashMap::<_, _>::default();
let mut vaas = HashMap::<_, _>::default();
for (proof, _) in updates.values() {
let vaa = utils::get_vaa_buffer(proof);
if let Entry::Vacant(entry) = vaas.entry(vaa) {
let guardian_set_index = utils::get_guardian_set_index(proof)?;
let mut pubkey: Pubkey;
loop {
let keypair = Keypair::new();
pubkey = keypair.pubkey();
match encoded_vaas.entry(pubkey) {
Entry::Vacant(entry) => {
entry.insert(keypair);
break;
}
Entry::Occupied(_) => continue,
}
}
entry.insert((pubkey, guardian_set_index));
}
}
for (vaa, (pubkey, guardian_set_index)) in vaas.iter() {
let draft_vaa = encoded_vaas.remove(pubkey).expect("must exist");
let create = wormhole
.create_encoded_vaa(draft_vaa, vaa.len() as u64)
.await?;
let draft_vaa = pubkey;
let write_1 = wormhole.write_encoded_vaa(draft_vaa, 0, &vaa[0..VAA_SPLIT_INDEX]);
let write_2 = wormhole.write_encoded_vaa(
draft_vaa,
VAA_SPLIT_INDEX as u32,
&vaa[VAA_SPLIT_INDEX..],
);
let verify = wormhole.verify_encoded_vaa_v1(draft_vaa, *guardian_set_index);
ixns.try_push_post(create.clear_output())
.map_err(|(_, err)| err)?;
ixns.try_push_post(write_1).map_err(|(_, err)| err)?;
ixns.try_push_post(write_2).map_err(|(_, err)| err)?;
ixns.try_push_post(verify).map_err(|(_, err)| err)?;
let close_encoded_vaa = wormhole.close_encoded_vaa(draft_vaa);
ixns.try_push_close(close_encoded_vaa)
.map_err(|(_, err)| err)?;
}
let (post, close) = ixns.split_mut();
{
let mut post = post.push_parallel();
let mut close = close.push_parallel();
for (feed_id, (proof, update)) in updates {
let price_update = Keypair::new();
let vaa = utils::get_vaa_buffer(proof);
let Some((encoded_vaa, _)) = vaas.get(vaa) else {
continue;
};
let (post_price_update, price_update) = pyth
.post_price_update(price_update, update, encoded_vaa)?
.swap_output(());
prices.insert(Pubkey::new_from_array(feed_id.to_bytes()), price_update);
post.add(post_price_update);
close.add(pyth.reclaim_rent(&price_update));
}
}
Ok((ixns, HashMap::from([(PriceProviderKind::Pyth, prices)])))
}
}
impl<'r, 'a, C: Deref<Target = impl Signer> + Clone> PostPullOraclePrices<'a, C>
for &'r PythPullOracleWithHermes<'a, C>
where
'r: 'a,
{
async fn fetch_price_update_instructions(
&self,
price_updates: &Self::PriceUpdates,
options: BundleOptions,
) -> crate::Result<(
PriceUpdateInstructions<'a, C>,
HashMap<PriceProviderKind, FeedAddressMap>,
)> {
(*self)
.fetch_price_update_instructions(price_updates, options)
.await
}
}