pyth_hermes_client_rust/
ws_connection.rs

1use std::hash::{DefaultHasher, Hash, Hasher};
2
3use anyhow::anyhow;
4use anyhow::Result;
5use derive_more::From;
6use futures_util::{SinkExt, StreamExt, TryStreamExt};
7use serde::{Deserialize, Serialize};
8use tokio_tungstenite::{connect_async, tungstenite::Message};
9use tracing::warn;
10use url::Url;
11
12#[derive(Serialize, Debug, Clone)]
13#[serde(tag = "type")]
14pub enum HermesClientMessage {
15    #[serde(rename = "subscribe")]
16    Subscribe(HermesClientMessageSubscribe),
17    #[serde(rename = "unsubscribe")]
18    Unsubscribe(HermesClientMessageUnsubscribe),
19}
20
21#[derive(Serialize, Debug, Clone)]
22pub struct HermesClientMessageSubscribe {
23    pub ids: Vec<String>,
24    #[serde(default)]
25    pub verbose: bool,
26    #[serde(default)]
27    pub binary: bool,
28    #[serde(default)]
29    pub allow_out_of_order: bool,
30    #[serde(default)]
31    pub ignore_invalid_price_ids: bool,
32}
33
34#[derive(Serialize, Debug, Clone)]
35pub struct HermesClientMessageUnsubscribe {
36    pub ids: Vec<String>,
37}
38
39#[derive(Deserialize, Debug, Clone, Hash)]
40#[serde(tag = "type")]
41pub enum HermesServerMessage {
42    #[serde(rename = "response")]
43    Response(HermesServerResponseMessage),
44    #[serde(rename = "price_update")]
45    PriceUpdate { price_feed: HermesPriceFeed },
46}
47
48impl HermesServerMessage {
49    pub fn cache_key(&self) -> u64 {
50        let mut hasher = DefaultHasher::new();
51        self.hash(&mut hasher);
52        hasher.finish()
53    }
54}
55
56#[derive(Serialize, Deserialize, Debug, Clone, Hash)]
57#[serde(tag = "status")]
58pub enum HermesServerResponseMessage {
59    #[serde(rename = "success")]
60    Success,
61    #[serde(rename = "error")]
62    Err { error: String },
63}
64
65#[derive(Deserialize, Serialize, Debug, Clone, Hash)]
66pub struct HermesPriceFeed {
67    pub id: String,
68    pub price: HermesPrice,
69    pub ema_price: HermesPrice,
70    #[serde(skip_serializing_if = "Option::is_none")]
71    pub metadata: Option<HermesPriceFeedMetadata>,
72    #[serde(skip_serializing_if = "Option::is_none")]
73    pub vaa: Option<String>,
74}
75
76#[derive(Deserialize, Serialize, Debug, Clone, Hash)]
77pub struct HermesPrice {
78    #[serde(with = "pyth_sdk::utils::as_string")]
79    pub price: i64,
80    #[serde(with = "pyth_sdk::utils::as_string")]
81    pub conf: u64,
82    /// Exponent.
83    pub expo: i32,
84    /// Publish time.
85    pub publish_time: i64,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
89pub struct HermesPriceFeedMetadata {
90    pub slot: Option<u64>,
91    pub emitter_chain: u16,
92    pub price_service_receive_time: Option<i64>,
93    pub prev_publish_time: Option<i64>,
94}
95
96/// A WebSocket client for consuming Pyth Hermes price feed updates
97///
98/// This client provides a simple interface to:
99/// - Connect to a Hermes WebSocket endpoint
100/// - Subscribe to price feed updates
101/// - Receive updates as a stream of messages
102///
103pub struct HermesWSConnection {
104    endpoint: Url,
105    ws_sender: Option<
106        futures_util::stream::SplitSink<
107            tokio_tungstenite::WebSocketStream<
108                tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
109            >,
110            Message,
111        >,
112    >,
113}
114
115impl HermesWSConnection {
116    /// Creates a new Hermes client instance
117    ///
118    /// # Arguments
119    /// * `endpoint` - The WebSocket URL of the Hermes service
120    ///
121    /// # Returns
122    /// Returns a new client instance (not yet connected)
123    pub fn new(endpoint: Url) -> Result<Self> {
124        Ok(Self {
125            endpoint,
126            ws_sender: None,
127        })
128    }
129
130    /// Starts the WebSocket connection
131    ///
132    /// # Returns
133    /// Returns a stream of responses from the server
134    pub async fn start(
135        &mut self,
136    ) -> Result<impl futures_util::Stream<Item = Result<HermesServerMessage>>> {
137        let url = self.endpoint.clone();
138
139        let (ws_stream, _) = connect_async(url).await?;
140        let (ws_sender, ws_receiver) = ws_stream.split();
141
142        self.ws_sender = Some(ws_sender);
143        let response_stream =
144            ws_receiver
145                .map_err(anyhow::Error::from)
146                .try_filter_map(|msg| async {
147                    let r: Result<Option<HermesServerMessage>> = match msg {
148                        Message::Text(text) => {
149                            Ok(Some(serde_json::from_str::<HermesServerMessage>(&text)?))
150                        }
151                        Message::Binary(_) => {
152                            warn!("Received unexpected binary message");
153                            Ok(None)
154                        }
155                        Message::Close(_) => {
156                            Err(anyhow!("WebSocket connection closed unexpectedly"))
157                        }
158                        _ => Ok(None),
159                    };
160                    r
161                });
162
163        Ok(response_stream)
164    }
165
166    pub async fn send_request(&mut self, request: HermesClientMessage) -> Result<()> {
167        if let Some(sender) = &mut self.ws_sender {
168            let msg = serde_json::to_string(&request)?;
169            sender.send(Message::Text(msg)).await?;
170            Ok(())
171        } else {
172            anyhow::bail!("WebSocket connection not started")
173        }
174    }
175
176    /// Closes the WebSocket connection
177    pub async fn close(&mut self) -> Result<()> {
178        if let Some(sender) = &mut self.ws_sender {
179            sender.send(Message::Close(None)).await?;
180            self.ws_sender = None;
181            Ok(())
182        } else {
183            anyhow::bail!("WebSocket connection not started")
184        }
185    }
186}