pyth_hermes_client_rust/
ws_connection.rs1use std::hash::{DefaultHasher, Hash, Hasher};
2
3use anyhow::anyhow;
4use anyhow::Result;
5use derive_more::From;
6use futures_util::{SinkExt, StreamExt, TryStreamExt};
7use serde::{Deserialize, Serialize};
8use tokio_tungstenite::{connect_async, tungstenite::Message};
9use tracing::warn;
10use url::Url;
11
12#[derive(Serialize, Debug, Clone)]
13#[serde(tag = "type")]
14pub enum HermesClientMessage {
15 #[serde(rename = "subscribe")]
16 Subscribe(HermesClientMessageSubscribe),
17 #[serde(rename = "unsubscribe")]
18 Unsubscribe(HermesClientMessageUnsubscribe),
19}
20
21#[derive(Serialize, Debug, Clone)]
22pub struct HermesClientMessageSubscribe {
23 pub ids: Vec<String>,
24 #[serde(default)]
25 pub verbose: bool,
26 #[serde(default)]
27 pub binary: bool,
28 #[serde(default)]
29 pub allow_out_of_order: bool,
30 #[serde(default)]
31 pub ignore_invalid_price_ids: bool,
32}
33
34#[derive(Serialize, Debug, Clone)]
35pub struct HermesClientMessageUnsubscribe {
36 pub ids: Vec<String>,
37}
38
39#[derive(Deserialize, Debug, Clone, Hash)]
40#[serde(tag = "type")]
41pub enum HermesServerMessage {
42 #[serde(rename = "response")]
43 Response(HermesServerResponseMessage),
44 #[serde(rename = "price_update")]
45 PriceUpdate { price_feed: HermesPriceFeed },
46}
47
48impl HermesServerMessage {
49 pub fn cache_key(&self) -> u64 {
50 let mut hasher = DefaultHasher::new();
51 self.hash(&mut hasher);
52 hasher.finish()
53 }
54}
55
56#[derive(Serialize, Deserialize, Debug, Clone, Hash)]
57#[serde(tag = "status")]
58pub enum HermesServerResponseMessage {
59 #[serde(rename = "success")]
60 Success,
61 #[serde(rename = "error")]
62 Err { error: String },
63}
64
65#[derive(Deserialize, Serialize, Debug, Clone, Hash)]
66pub struct HermesPriceFeed {
67 pub id: String,
68 pub price: HermesPrice,
69 pub ema_price: HermesPrice,
70 #[serde(skip_serializing_if = "Option::is_none")]
71 pub metadata: Option<HermesPriceFeedMetadata>,
72 #[serde(skip_serializing_if = "Option::is_none")]
73 pub vaa: Option<String>,
74}
75
76#[derive(Deserialize, Serialize, Debug, Clone, Hash)]
77pub struct HermesPrice {
78 #[serde(with = "pyth_sdk::utils::as_string")]
79 pub price: i64,
80 #[serde(with = "pyth_sdk::utils::as_string")]
81 pub conf: u64,
82 pub expo: i32,
84 pub publish_time: i64,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
89pub struct HermesPriceFeedMetadata {
90 pub slot: Option<u64>,
91 pub emitter_chain: u16,
92 pub price_service_receive_time: Option<i64>,
93 pub prev_publish_time: Option<i64>,
94}
95
96pub struct HermesWSConnection {
104 endpoint: Url,
105 ws_sender: Option<
106 futures_util::stream::SplitSink<
107 tokio_tungstenite::WebSocketStream<
108 tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
109 >,
110 Message,
111 >,
112 >,
113}
114
115impl HermesWSConnection {
116 pub fn new(endpoint: Url) -> Result<Self> {
124 Ok(Self {
125 endpoint,
126 ws_sender: None,
127 })
128 }
129
130 pub async fn start(
135 &mut self,
136 ) -> Result<impl futures_util::Stream<Item = Result<HermesServerMessage>>> {
137 let url = self.endpoint.clone();
138
139 let (ws_stream, _) = connect_async(url).await?;
140 let (ws_sender, ws_receiver) = ws_stream.split();
141
142 self.ws_sender = Some(ws_sender);
143 let response_stream =
144 ws_receiver
145 .map_err(anyhow::Error::from)
146 .try_filter_map(|msg| async {
147 let r: Result<Option<HermesServerMessage>> = match msg {
148 Message::Text(text) => {
149 Ok(Some(serde_json::from_str::<HermesServerMessage>(&text)?))
150 }
151 Message::Binary(_) => {
152 warn!("Received unexpected binary message");
153 Ok(None)
154 }
155 Message::Close(_) => {
156 Err(anyhow!("WebSocket connection closed unexpectedly"))
157 }
158 _ => Ok(None),
159 };
160 r
161 });
162
163 Ok(response_stream)
164 }
165
166 pub async fn send_request(&mut self, request: HermesClientMessage) -> Result<()> {
167 if let Some(sender) = &mut self.ws_sender {
168 let msg = serde_json::to_string(&request)?;
169 sender.send(Message::Text(msg)).await?;
170 Ok(())
171 } else {
172 anyhow::bail!("WebSocket connection not started")
173 }
174 }
175
176 pub async fn close(&mut self) -> Result<()> {
178 if let Some(sender) = &mut self.ws_sender {
179 sender.send(Message::Close(None)).await?;
180 self.ws_sender = None;
181 Ok(())
182 } else {
183 anyhow::bail!("WebSocket connection not started")
184 }
185 }
186}