pyth_hermes_client/
stream.rs

1use eventsource_stream::Eventsource as _;
2use futures_util::{Stream, StreamExt, TryStreamExt};
3use serde::Serialize;
4
5use crate::{EncodingType, Error, PriceIdInput, PriceUpdate};
6
7/// Streams
8impl crate::PythClient {
9    /// SSE route handler for streaming price updates.
10    ///
11    /// Arguments:
12    /// * `ids`: Get the most recent price update for this set of price feed ids.
13    /// * `encoding`: Optional encoding type. If set, return the price update in the encoding
14    ///   specified by the encoding parameter. Default is [`EncodingType::Hex`].
15    /// * `parsed`: If `true`, include the parsed price update in [`PriceUpdate::parsed`]. Defaults
16    ///   to `false` for this client.
17    /// * `allow_unordered`: If `true`, allows unordered price updates to be included in the stream.
18    /// * `benchmarks_only`: If `true`, only include benchmark prices that are the initial price
19    ///   updates at a given timestamp (i.e., prevPubTime != pubTime).
20    ///
21    /// /v2/updates/price/stream
22    pub async fn stream_price_updates(
23        &self,
24        ids: Vec<PriceIdInput>,
25        encoding: Option<EncodingType>,
26        parsed: Option<bool>,
27        allow_unordered: Option<bool>,
28        benchmarks_only: Option<bool>,
29    ) -> Result<impl Stream<Item = Result<PriceUpdate, Error>> + use<>, Error> {
30        #[derive(Serialize)]
31        struct Options {
32            encoding: Option<EncodingType>,
33            parsed: Option<bool>,
34            allow_unordered: Option<bool>,
35            benchmarks_only: Option<bool>,
36        }
37
38        let mut url = self.url.clone();
39        url.set_path("/v2/updates/price/stream");
40
41        let mut builder = self.client.get(url);
42        for id in ids {
43            builder = builder.query(&[("ids[]", id)]);
44        }
45        let request = builder
46            .query(&Options {
47                encoding,
48                parsed: parsed.or(Some(false)),
49                allow_unordered,
50                benchmarks_only,
51            })
52            .build()
53            .map_err(Error::RequestBuilder)?;
54
55        let update_stream = self
56            .client
57            .execute(request)
58            .await
59            .map_err(Error::Execute)?
60            .bytes_stream()
61            .eventsource()
62            .map_err(Error::EventStream)
63            .map(|e| -> Result<PriceUpdate, _> {
64                serde_json::from_str(&e?.data).map_err(Error::EventData)
65            });
66        Ok(update_stream)
67    }
68}