Skip to main content

gmsol_sdk/client/chainlink/pull_oracle/
pull_oracle_impl.rs

1use std::{
2    collections::HashMap,
3    ops::Deref,
4    sync::{Arc, RwLock},
5};
6
7use gmsol_solana_utils::bundle_builder::{BundleBuilder, BundleOptions};
8
9use gmsol_utils::oracle::PriceProviderKind;
10use solana_sdk::{pubkey::Pubkey, signer::Signer};
11use time::OffsetDateTime;
12
13use crate::{
14    client::{
15        feeds_parser::{FeedAddressMap, Feeds},
16        ops::oracle::OracleOps,
17        pull_oracle::{FeedIds, PostPullOraclePrices, PriceUpdateInstructions, PullOracle},
18    },
19    ops::oracle::ChainlinkPriceFeedUpdateArgs,
20};
21
22use super::{client::ApiReportData, Client, FeedId};
23
24/// Chainlink Pull Oracle Factory.
25pub struct ChainlinkPullOracleFactory {
26    chainlink_program: Pubkey,
27    access_controller: Pubkey,
28    store: Pubkey,
29    feed_index: u16,
30    feeds: RwLock<FeedAddressMap>,
31}
32
33impl ChainlinkPullOracleFactory {
34    /// Create a new [`ChainlinkPullOracleFactory`] with default program ID and access controller address.
35    pub fn new(store: &Pubkey, feed_index: u16, testnet: bool) -> Self {
36        use gmsol_chainlink_datastreams::verifier;
37
38        let access_controller = if testnet {
39            super::access_controller_address::DEVNET_ADDRESS
40        } else {
41            super::access_controller_address::ADDRESS
42        };
43
44        Self::with_program_id_and_access_controller(
45            store,
46            feed_index,
47            &verifier::ID,
48            &access_controller,
49        )
50    }
51
52    /// Wrap in an [`Arc`].
53    pub fn arced(self) -> Arc<Self> {
54        Arc::new(self)
55    }
56
57    /// Create a new [`ChainlinkPullOracleFactory`] with the given program ID and access controller address.
58    pub fn with_program_id_and_access_controller(
59        store: &Pubkey,
60        feed_index: u16,
61        chainlink_program: &Pubkey,
62        access_controller: &Pubkey,
63    ) -> Self {
64        Self {
65            chainlink_program: *chainlink_program,
66            access_controller: *access_controller,
67            store: *store,
68            feed_index,
69            feeds: Default::default(),
70        }
71    }
72
73    /// Prepare feed accounts but do not send.
74    pub async fn prepare_feeds_bundle<'a, C: Deref<Target = impl Signer> + Clone>(
75        &self,
76        gmsol: &'a crate::Client<C>,
77        feed_ids: HashMap<Pubkey, FeedId>,
78        options: BundleOptions,
79    ) -> crate::Result<BundleBuilder<'a, C>> {
80        let provider = PriceProviderKind::ChainlinkDataStreams;
81        let mut txs = gmsol.bundle_with_options(options);
82        let authority = gmsol.payer();
83        for (token, feed_id) in feed_ids {
84            let address = gmsol.find_price_feed_address(
85                &self.store,
86                &authority,
87                self.feed_index,
88                provider,
89                &token,
90            );
91            let feed_id = Pubkey::new_from_array(feed_id);
92            match gmsol.price_feed(&address).await? {
93                Some(feed) => {
94                    if feed.feed_id != feed_id {
95                        return Err(crate::Error::custom("feed_id mismatched"));
96                    }
97                }
98                None => {
99                    txs.push(
100                        gmsol
101                            .initialize_price_feed(
102                                &self.store,
103                                self.feed_index,
104                                provider,
105                                &token,
106                                &feed_id,
107                            )
108                            .0,
109                    )?;
110                }
111            }
112            self.feeds.write().unwrap().insert(feed_id, address);
113        }
114
115        let feeds = self
116            .feeds
117            .read()
118            .unwrap()
119            .values()
120            .copied()
121            .collect::<Vec<_>>();
122
123        tracing::info!("Using custom feeds: {feeds:#?}");
124
125        Ok(txs)
126    }
127
128    /// Prepare feed accounts for the given tokens and feed_ids.
129    pub async fn prepare_feeds<C: Deref<Target = impl Signer> + Clone>(
130        &self,
131        gmsol: &crate::Client<C>,
132        feed_ids: HashMap<Pubkey, FeedId>,
133    ) -> crate::Result<()> {
134        let txs = self
135            .prepare_feeds_bundle(gmsol, feed_ids, Default::default())
136            .await?;
137
138        if !txs.is_empty() {
139            match txs.build()?.send_all(false).await {
140                Ok(signatures) => {
141                    tracing::info!("initialized feeds with txs: {signatures:#?}");
142                }
143                Err((signatures, err)) => {
144                    tracing::error!(%err, "failed to initailize feeds, successful txs: {signatures:#?}");
145                }
146            }
147        }
148
149        Ok(())
150    }
151
152    /// Create [`ChainlinkPullOracle`].
153    pub fn make_oracle<'a, C>(
154        self: Arc<Self>,
155        chainlink: &'a Client,
156        gmsol: &'a crate::Client<C>,
157        skip_feeds_preparation: bool,
158    ) -> ChainlinkPullOracle<'a, C> {
159        ChainlinkPullOracle::new(chainlink, gmsol, self, skip_feeds_preparation)
160    }
161}
162
163/// Chainlink Pull Oracle.
164pub struct ChainlinkPullOracle<'a, C> {
165    chainlink: &'a Client,
166    gmsol: &'a crate::Client<C>,
167    ctx: Arc<ChainlinkPullOracleFactory>,
168    skip_feeds_preparation: bool,
169    authority: Option<&'a dyn Signer>,
170    idempotent: bool,
171}
172
173impl<C> Clone for ChainlinkPullOracle<'_, C> {
174    fn clone(&self) -> Self {
175        Self {
176            ctx: self.ctx.clone(),
177            ..*self
178        }
179    }
180}
181
182impl<'a, C> ChainlinkPullOracle<'a, C> {
183    /// Create a new [`ChainlinkPullOracle`] with default program ID and access controller address.
184    pub fn new(
185        chainlink: &'a Client,
186        gmsol: &'a crate::Client<C>,
187        ctx: Arc<ChainlinkPullOracleFactory>,
188        skip_feeds_preparation: bool,
189    ) -> Self {
190        Self {
191            chainlink,
192            gmsol,
193            ctx,
194            skip_feeds_preparation,
195            authority: None,
196            idempotent: true,
197        }
198    }
199
200    /// Returns a new oracle with the given authority.
201    pub fn with_authority(mut self, authority: Option<&'a dyn Signer>) -> Self {
202        self.authority = authority;
203        self
204    }
205
206    /// Returns a new oracle with the given `idempotent` option,
207    /// which defaults to `true`.
208    pub fn with_idempotent(mut self, idempotent: bool) -> Self {
209        self.idempotent = idempotent;
210        self
211    }
212}
213
214impl<C: Deref<Target = impl Signer> + Clone> ChainlinkPullOracle<'_, C> {
215    /// Prepare feed accounts but do not send.
216    pub async fn prepare_feeds_bundle(
217        &self,
218        feed_ids: &FeedIds,
219        options: BundleOptions,
220    ) -> crate::Result<BundleBuilder<C>> {
221        self.ctx
222            .prepare_feeds_bundle(self.gmsol, filter_feed_ids(feed_ids)?, options)
223            .await
224    }
225}
226
227impl<C: Deref<Target = impl Signer> + Clone> PullOracle for ChainlinkPullOracle<'_, C> {
228    type PriceUpdates = HashMap<FeedId, ApiReportData>;
229
230    async fn fetch_price_updates(
231        &self,
232        feed_ids: &FeedIds,
233        after: Option<OffsetDateTime>,
234    ) -> crate::Result<Self::PriceUpdates> {
235        let feeds = filter_feed_ids(feed_ids)?;
236
237        let feed_ids = feeds.values().map(hex::encode).collect::<Vec<_>>();
238
239        if !self.skip_feeds_preparation {
240            self.ctx.prepare_feeds(self.gmsol, feeds).await?;
241        }
242
243        let tasks = feed_ids
244            .iter()
245            .map(|feed_id| self.chainlink.latest_report(feed_id));
246        let price_updates = futures_util::future::try_join_all(tasks).await?;
247
248        let updates = price_updates
249            .into_iter()
250            .map(|report| {
251                let feed_id = report.decode_feed_id()?;
252                let ts = report.observations_timestamp;
253
254                if let Some(after) = after {
255                    let ts =
256                        OffsetDateTime::from_unix_timestamp(ts).map_err(crate::Error::custom)?;
257                    if after > ts {
258                        return Err(crate::Error::custom(format!(
259                            "price updates are too old, ts={ts}, required={after}"
260                        )));
261                    }
262                }
263
264                Ok((feed_id, report.into_data()))
265            })
266            .collect::<crate::Result<HashMap<_, _>>>()?;
267
268        Ok(updates)
269    }
270}
271
272impl<'a, C: Deref<Target = impl Signer> + Clone> PostPullOraclePrices<'a, C>
273    for ChainlinkPullOracle<'a, C>
274{
275    async fn fetch_price_update_instructions(
276        &self,
277        price_updates: &Self::PriceUpdates,
278        options: BundleOptions,
279    ) -> crate::Result<(
280        PriceUpdateInstructions<'a, C>,
281        HashMap<PriceProviderKind, FeedAddressMap>,
282    )> {
283        let mut txs = PriceUpdateInstructions::new(self.gmsol, options);
284        let mut map = HashMap::with_capacity(price_updates.len());
285
286        let feeds = self.ctx.feeds.read().unwrap();
287
288        {
289            let mut pg = txs.split_mut().0.push_parallel();
290            for (feed_id, update) in price_updates {
291                let feed_id = Pubkey::new_from_array(*feed_id);
292                tracing::info!("adding ix to post price update for {feed_id}");
293                let feed = feeds.get(&feed_id).ok_or_else(|| {
294                    crate::Error::custom(format!(
295                        "feed account for the given `feed_id` is not provided, feed_id = {feed_id}"
296                    ))
297                })?;
298                let rpc = self.gmsol.update_price_feed_with_chainlink_and_authority(
299                    &self.ctx.store,
300                    feed,
301                    ChainlinkPriceFeedUpdateArgs {
302                        chainlink: &self.ctx.chainlink_program,
303                        access_controller: &self.ctx.access_controller,
304                        signed_report: &update.report_bytes()?,
305                        idempotent: self.idempotent,
306                    },
307                    self.authority,
308                )?;
309                pg.add(rpc);
310                map.insert(feed_id, *feed);
311            }
312        }
313
314        Ok((
315            txs,
316            HashMap::from([(PriceProviderKind::ChainlinkDataStreams, map)]),
317        ))
318    }
319}
320
321/// Filter feed ids.
322pub fn filter_feed_ids(feed_ids: &FeedIds) -> crate::Result<HashMap<Pubkey, FeedId>> {
323    Feeds::new(feed_ids)
324        .filter_map(|res| {
325            res.map(|config| {
326                matches!(config.provider, PriceProviderKind::ChainlinkDataStreams)
327                    .then(|| (config.token, config.feed.to_bytes()))
328            })
329            .transpose()
330        })
331        .collect::<crate::Result<HashMap<_, _>>>()
332}