gmsol_sdk/client/chainlink/pull_oracle/
pull_oracle_impl.rs1use std::{
2 collections::HashMap,
3 ops::Deref,
4 sync::{Arc, RwLock},
5};
6
7use gmsol_solana_utils::bundle_builder::{BundleBuilder, BundleOptions};
8
9use gmsol_utils::oracle::PriceProviderKind;
10use solana_sdk::{pubkey::Pubkey, signer::Signer};
11use time::OffsetDateTime;
12
13use crate::{
14 client::{
15 feeds_parser::{FeedAddressMap, Feeds},
16 ops::oracle::OracleOps,
17 pull_oracle::{FeedIds, PostPullOraclePrices, PriceUpdateInstructions, PullOracle},
18 },
19 ops::oracle::ChainlinkPriceFeedUpdateArgs,
20};
21
22use super::{client::ApiReportData, Client, FeedId};
23
24pub struct ChainlinkPullOracleFactory {
26 chainlink_program: Pubkey,
27 access_controller: Pubkey,
28 store: Pubkey,
29 feed_index: u16,
30 feeds: RwLock<FeedAddressMap>,
31}
32
33impl ChainlinkPullOracleFactory {
34 pub fn new(store: &Pubkey, feed_index: u16, testnet: bool) -> Self {
36 use gmsol_chainlink_datastreams::verifier;
37
38 let access_controller = if testnet {
39 super::access_controller_address::DEVNET_ADDRESS
40 } else {
41 super::access_controller_address::ADDRESS
42 };
43
44 Self::with_program_id_and_access_controller(
45 store,
46 feed_index,
47 &verifier::ID,
48 &access_controller,
49 )
50 }
51
52 pub fn arced(self) -> Arc<Self> {
54 Arc::new(self)
55 }
56
57 pub fn with_program_id_and_access_controller(
59 store: &Pubkey,
60 feed_index: u16,
61 chainlink_program: &Pubkey,
62 access_controller: &Pubkey,
63 ) -> Self {
64 Self {
65 chainlink_program: *chainlink_program,
66 access_controller: *access_controller,
67 store: *store,
68 feed_index,
69 feeds: Default::default(),
70 }
71 }
72
73 pub async fn prepare_feeds_bundle<'a, C: Deref<Target = impl Signer> + Clone>(
75 &self,
76 gmsol: &'a crate::Client<C>,
77 feed_ids: HashMap<Pubkey, FeedId>,
78 options: BundleOptions,
79 ) -> crate::Result<BundleBuilder<'a, C>> {
80 let provider = PriceProviderKind::ChainlinkDataStreams;
81 let mut txs = gmsol.bundle_with_options(options);
82 let authority = gmsol.payer();
83 for (token, feed_id) in feed_ids {
84 let address = gmsol.find_price_feed_address(
85 &self.store,
86 &authority,
87 self.feed_index,
88 provider,
89 &token,
90 );
91 let feed_id = Pubkey::new_from_array(feed_id);
92 match gmsol.price_feed(&address).await? {
93 Some(feed) => {
94 if feed.feed_id != feed_id {
95 return Err(crate::Error::custom("feed_id mismatched"));
96 }
97 }
98 None => {
99 txs.push(
100 gmsol
101 .initialize_price_feed(
102 &self.store,
103 self.feed_index,
104 provider,
105 &token,
106 &feed_id,
107 )
108 .0,
109 )?;
110 }
111 }
112 self.feeds.write().unwrap().insert(feed_id, address);
113 }
114
115 let feeds = self
116 .feeds
117 .read()
118 .unwrap()
119 .values()
120 .copied()
121 .collect::<Vec<_>>();
122
123 tracing::info!("Using custom feeds: {feeds:#?}");
124
125 Ok(txs)
126 }
127
128 pub async fn prepare_feeds<C: Deref<Target = impl Signer> + Clone>(
130 &self,
131 gmsol: &crate::Client<C>,
132 feed_ids: HashMap<Pubkey, FeedId>,
133 ) -> crate::Result<()> {
134 let txs = self
135 .prepare_feeds_bundle(gmsol, feed_ids, Default::default())
136 .await?;
137
138 if !txs.is_empty() {
139 match txs.build()?.send_all(false).await {
140 Ok(signatures) => {
141 tracing::info!("initialized feeds with txs: {signatures:#?}");
142 }
143 Err((signatures, err)) => {
144 tracing::error!(%err, "failed to initailize feeds, successful txs: {signatures:#?}");
145 }
146 }
147 }
148
149 Ok(())
150 }
151
152 pub fn make_oracle<'a, C>(
154 self: Arc<Self>,
155 chainlink: &'a Client,
156 gmsol: &'a crate::Client<C>,
157 skip_feeds_preparation: bool,
158 ) -> ChainlinkPullOracle<'a, C> {
159 ChainlinkPullOracle::new(chainlink, gmsol, self, skip_feeds_preparation)
160 }
161}
162
163pub struct ChainlinkPullOracle<'a, C> {
165 chainlink: &'a Client,
166 gmsol: &'a crate::Client<C>,
167 ctx: Arc<ChainlinkPullOracleFactory>,
168 skip_feeds_preparation: bool,
169 authority: Option<&'a dyn Signer>,
170 idempotent: bool,
171}
172
173impl<C> Clone for ChainlinkPullOracle<'_, C> {
174 fn clone(&self) -> Self {
175 Self {
176 ctx: self.ctx.clone(),
177 ..*self
178 }
179 }
180}
181
182impl<'a, C> ChainlinkPullOracle<'a, C> {
183 pub fn new(
185 chainlink: &'a Client,
186 gmsol: &'a crate::Client<C>,
187 ctx: Arc<ChainlinkPullOracleFactory>,
188 skip_feeds_preparation: bool,
189 ) -> Self {
190 Self {
191 chainlink,
192 gmsol,
193 ctx,
194 skip_feeds_preparation,
195 authority: None,
196 idempotent: true,
197 }
198 }
199
200 pub fn with_authority(mut self, authority: Option<&'a dyn Signer>) -> Self {
202 self.authority = authority;
203 self
204 }
205
206 pub fn with_idempotent(mut self, idempotent: bool) -> Self {
209 self.idempotent = idempotent;
210 self
211 }
212}
213
214impl<C: Deref<Target = impl Signer> + Clone> ChainlinkPullOracle<'_, C> {
215 pub async fn prepare_feeds_bundle(
217 &self,
218 feed_ids: &FeedIds,
219 options: BundleOptions,
220 ) -> crate::Result<BundleBuilder<C>> {
221 self.ctx
222 .prepare_feeds_bundle(self.gmsol, filter_feed_ids(feed_ids)?, options)
223 .await
224 }
225}
226
227impl<C: Deref<Target = impl Signer> + Clone> PullOracle for ChainlinkPullOracle<'_, C> {
228 type PriceUpdates = HashMap<FeedId, ApiReportData>;
229
230 async fn fetch_price_updates(
231 &self,
232 feed_ids: &FeedIds,
233 after: Option<OffsetDateTime>,
234 ) -> crate::Result<Self::PriceUpdates> {
235 let feeds = filter_feed_ids(feed_ids)?;
236
237 let feed_ids = feeds.values().map(hex::encode).collect::<Vec<_>>();
238
239 if !self.skip_feeds_preparation {
240 self.ctx.prepare_feeds(self.gmsol, feeds).await?;
241 }
242
243 let tasks = feed_ids
244 .iter()
245 .map(|feed_id| self.chainlink.latest_report(feed_id));
246 let price_updates = futures_util::future::try_join_all(tasks).await?;
247
248 let updates = price_updates
249 .into_iter()
250 .map(|report| {
251 let feed_id = report.decode_feed_id()?;
252 let ts = report.observations_timestamp;
253
254 if let Some(after) = after {
255 let ts =
256 OffsetDateTime::from_unix_timestamp(ts).map_err(crate::Error::custom)?;
257 if after > ts {
258 return Err(crate::Error::custom(format!(
259 "price updates are too old, ts={ts}, required={after}"
260 )));
261 }
262 }
263
264 Ok((feed_id, report.into_data()))
265 })
266 .collect::<crate::Result<HashMap<_, _>>>()?;
267
268 Ok(updates)
269 }
270}
271
272impl<'a, C: Deref<Target = impl Signer> + Clone> PostPullOraclePrices<'a, C>
273 for ChainlinkPullOracle<'a, C>
274{
275 async fn fetch_price_update_instructions(
276 &self,
277 price_updates: &Self::PriceUpdates,
278 options: BundleOptions,
279 ) -> crate::Result<(
280 PriceUpdateInstructions<'a, C>,
281 HashMap<PriceProviderKind, FeedAddressMap>,
282 )> {
283 let mut txs = PriceUpdateInstructions::new(self.gmsol, options);
284 let mut map = HashMap::with_capacity(price_updates.len());
285
286 let feeds = self.ctx.feeds.read().unwrap();
287
288 {
289 let mut pg = txs.split_mut().0.push_parallel();
290 for (feed_id, update) in price_updates {
291 let feed_id = Pubkey::new_from_array(*feed_id);
292 tracing::info!("adding ix to post price update for {feed_id}");
293 let feed = feeds.get(&feed_id).ok_or_else(|| {
294 crate::Error::custom(format!(
295 "feed account for the given `feed_id` is not provided, feed_id = {feed_id}"
296 ))
297 })?;
298 let rpc = self.gmsol.update_price_feed_with_chainlink_and_authority(
299 &self.ctx.store,
300 feed,
301 ChainlinkPriceFeedUpdateArgs {
302 chainlink: &self.ctx.chainlink_program,
303 access_controller: &self.ctx.access_controller,
304 signed_report: &update.report_bytes()?,
305 idempotent: self.idempotent,
306 },
307 self.authority,
308 )?;
309 pg.add(rpc);
310 map.insert(feed_id, *feed);
311 }
312 }
313
314 Ok((
315 txs,
316 HashMap::from([(PriceProviderKind::ChainlinkDataStreams, map)]),
317 ))
318 }
319}
320
321pub fn filter_feed_ids(feed_ids: &FeedIds) -> crate::Result<HashMap<Pubkey, FeedId>> {
323 Feeds::new(feed_ids)
324 .filter_map(|res| {
325 res.map(|config| {
326 matches!(config.provider, PriceProviderKind::ChainlinkDataStreams)
327 .then(|| (config.token, config.feed.to_bytes()))
328 })
329 .transpose()
330 })
331 .collect::<crate::Result<HashMap<_, _>>>()
332}