pyth_hermes_client/
stream.rs1use eventsource_stream::Eventsource as _;
2use futures::{Stream, StreamExt, TryStreamExt};
3use serde::Serialize;
4
5use crate::{EncodingType, Error, PriceIdInput, PriceUpdate};
6
7impl crate::PythClient {
9 pub async fn stream_price_updates(
23 &self,
24 ids: Vec<PriceIdInput>,
25 encoding: Option<EncodingType>,
26 parsed: Option<bool>,
27 allow_unordered: Option<bool>,
28 benchmarks_only: Option<bool>,
29 ) -> Result<impl Stream<Item = Result<PriceUpdate, Error>> + use<>, Error> {
30 #[derive(Serialize)]
31 struct Options {
32 encoding: Option<EncodingType>,
33 parsed: Option<bool>,
34 allow_unordered: Option<bool>,
35 benchmarks_only: Option<bool>,
36 }
37
38 let mut url = self.url.clone();
39 url.set_path("/v2/updates/price/stream");
40
41 let mut builder = self.client.get(url);
42 for id in ids {
43 builder = builder.query(&[("ids[]", id)]);
44 }
45 let request = builder
46 .query(&Options {
47 encoding,
48 parsed: parsed.or(Some(false)),
49 allow_unordered,
50 benchmarks_only,
51 })
52 .build()
53 .map_err(Error::RequestBuilder)?;
54
55 let update_stream = self
56 .client
57 .execute(request)
58 .await
59 .map_err(Error::Execute)?
60 .bytes_stream()
61 .eventsource()
62 .map_err(Error::EventStream)
63 .map(|e| -> Result<PriceUpdate, _> {
64 serde_json::from_str(&e?.data).map_err(Error::EventData)
65 });
66 Ok(update_stream)
67 }
68}