1use crate::error::{Error, Result};
12use futures_util::{SinkExt, StreamExt};
13use serde_json::Value;
14use std::collections::HashMap;
15use std::sync::{Arc, Mutex};
16use tokio::sync::mpsc;
17use tokio_tungstenite::tungstenite::Message as WsMessage;
18
19pub const QUERY_NEW_BLOCK: &str = "tm.event='NewBlock'";
21pub const QUERY_TX: &str = "tm.event='Tx'";
23
24#[derive(Debug, Clone)]
26pub struct Event {
27 pub query: String,
29 pub data: Value,
31 pub result: Value,
33}
34
35type Subs = Arc<Mutex<HashMap<String, mpsc::UnboundedSender<Event>>>>;
36
37pub struct SubscribeClient {
39 writer: mpsc::UnboundedSender<WsMessage>,
40 subs: Subs,
41 next_id: Arc<Mutex<u64>>,
42}
43
44pub struct Subscription {
46 pub events: mpsc::UnboundedReceiver<Event>,
48 id: String,
49 query: String,
50 writer: mpsc::UnboundedSender<WsMessage>,
51 subs: Subs,
52}
53
54impl Subscription {
55 pub fn unsubscribe(&self) -> Result<()> {
58 self.subs.lock().unwrap().remove(&self.id);
59 let req = serde_json::json!({
60 "jsonrpc": "2.0",
61 "id": format!("{}-unsub", self.id),
62 "method": "unsubscribe",
63 "params": { "query": self.query },
64 });
65 self.writer
66 .send(WsMessage::Text(req.to_string()))
67 .map_err(|e| Error::Transport(format!("send unsubscribe: {e}")))
68 }
69}
70
71impl SubscribeClient {
72 pub async fn connect(endpoint: &str) -> Result<Self> {
79 let ws_url = normalize_ws_url(endpoint);
80 let (stream, _) = tokio_tungstenite::connect_async(&ws_url)
81 .await
82 .map_err(|e| Error::Transport(format!("dial {ws_url}: {e}")))?;
83 let (mut sink, mut source) = stream.split();
84
85 let (write_tx, mut write_rx) = mpsc::unbounded_channel::<WsMessage>();
86 let subs: Subs = Arc::new(Mutex::new(HashMap::new()));
87
88 tokio::spawn(async move {
90 while let Some(msg) = write_rx.recv().await {
91 if sink.send(msg).await.is_err() {
92 break;
93 }
94 }
95 });
96
97 let reader_subs = subs.clone();
99 tokio::spawn(async move {
100 while let Some(Ok(msg)) = source.next().await {
101 if let WsMessage::Text(text) = msg {
102 dispatch_frame(&reader_subs, text.as_bytes());
103 }
104 }
105 });
106
107 Ok(Self {
108 writer: write_tx,
109 subs,
110 next_id: Arc::new(Mutex::new(0)),
111 })
112 }
113
114 pub fn subscribe(&self, query: &str) -> Result<Subscription> {
117 let id = {
118 let mut n = self.next_id.lock().unwrap();
119 *n += 1;
120 n.to_string()
121 };
122 let (tx, rx) = mpsc::unbounded_channel::<Event>();
123 self.subs.lock().unwrap().insert(id.clone(), tx);
124
125 let req = serde_json::json!({
126 "jsonrpc": "2.0",
127 "id": id,
128 "method": "subscribe",
129 "params": { "query": query },
130 });
131 self.writer
132 .send(WsMessage::Text(req.to_string()))
133 .map_err(|e| {
134 self.subs.lock().unwrap().remove(&id);
135 Error::Transport(format!("send subscribe: {e}"))
136 })?;
137
138 Ok(Subscription {
139 events: rx,
140 id,
141 query: query.to_string(),
142 writer: self.writer.clone(),
143 subs: self.subs.clone(),
144 })
145 }
146
147 pub fn subscribe_new_blocks(&self) -> Result<Subscription> {
149 self.subscribe(QUERY_NEW_BLOCK)
150 }
151
152 pub fn subscribe_tx(&self, query: &str) -> Result<Subscription> {
156 let q = if query.trim().is_empty() {
157 QUERY_TX.to_string()
158 } else if !query.contains("tm.event") {
159 format!("{QUERY_TX} AND {query}")
160 } else {
161 query.to_string()
162 };
163 self.subscribe(&q)
164 }
165}
166
167pub fn dispatch_frame(subs: &Subs, frame: &[u8]) {
173 let v: Value = match serde_json::from_slice(frame) {
174 Ok(v) => v,
175 Err(_) => return,
176 };
177 let result = &v["result"];
178 let data = &result["data"];
179 if data.is_null() {
181 return;
182 }
183 let id = match v["id"].as_str() {
184 Some(id) => id.to_string(),
185 None => return,
186 };
187 let tx = {
188 let guard = subs.lock().unwrap();
189 match guard.get(&id) {
190 Some(tx) => tx.clone(),
191 None => return,
192 }
193 };
194 let _ = tx.send(Event {
195 query: result["query"].as_str().unwrap_or("").to_string(),
196 data: data.clone(),
197 result: result.clone(),
198 });
199}
200
201pub fn new_subs() -> Subs {
203 Arc::new(Mutex::new(HashMap::new()))
204}
205
206pub fn register_for_test(subs: &Subs, id: &str) -> mpsc::UnboundedReceiver<Event> {
209 let (tx, rx) = mpsc::unbounded_channel::<Event>();
210 subs.lock().unwrap().insert(id.to_string(), tx);
211 rx
212}
213
214fn normalize_ws_url(endpoint: &str) -> String {
217 let mut u = if let Some(rest) = endpoint.strip_prefix("https://") {
218 format!("wss://{rest}")
219 } else if let Some(rest) = endpoint.strip_prefix("http://") {
220 format!("ws://{rest}")
221 } else {
222 endpoint.to_string()
223 };
224 while u.ends_with('/') {
225 u.pop();
226 }
227 if !u.ends_with("/websocket") {
228 u.push_str("/websocket");
229 }
230 u
231}
232
233pub type SubscriptionMap = Subs;