v_exchanges_api_generics 0.19.3

A client for HTTP/HTTPS/WebSocket APIs.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
use std::{
	time::{Duration, SystemTime},
	vec,
};

use ahash::AHashSet;
use eyre::{Result, bail};
use futures_util::{SinkExt as _, StreamExt as _};
use jiff::Timestamp;
use reqwest::Url;
use tokio::net::TcpStream;
use tokio_tungstenite::{
	MaybeTlsStream, WebSocketStream,
	tungstenite::{self, Bytes},
};
use tracing::instrument;

use crate::{ConstructAuthError, RetryConfig, UrlError, retry::ExponentialBackoff};

type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;

/// handle exchange-level events on the [WsConnection].
pub trait WsHandler: std::fmt::Debug {
	/// Returns a [WsConfig] that will be applied for all WebSocket connections handled by this handler.
	fn config(&self) -> Result<WsConfig, UrlError> {
		Ok(WsConfig::default())
	}

	/// Called when the [WsConnection] is created and on reconnection. Returned messages will be sent back to the server as-is.
	///
	/// Handling of `listen-key`s or any other authentication methods exchange demands should be done here. Although oftentimes handling the auth will spread into the [handle_message](Self::handle_message) too.
	/// Can be ran multiple times (on every reconnect). Thus this inherently cannot be used to initiate connectionions based on a change of state (ie order creation).
	#[allow(unused_variables)]
	fn handle_auth(&mut self) -> Result<Vec<tungstenite::Message>, WsError> {
		Ok(vec![])
	}

	//Q: problem: can be either {String, serde_json::Value} //? other things?
	/*
	  "position"
	  ||
	  json!{
	"id": "56374a46-3061-486b-a311-99ee972eb648",
	"method": "order.place",
	"params": {
	  "symbol": "BTCUSDT",
	  "side": "SELL",
	  "type": "LIMIT",
	  "timeInForce": "GTC",
	  "price": "23416.10000000",
	  "quantity": "0.00847000",
	  "apiKey": "vmPUZE6mv9SD5VNHk4HlWFsOr6aKE2zvsw0MuIgwCIPy6utIco14y7Ju91duEh8A",
	  "signature": "15af09e41c36f3cc61378c2fbe2c33719a03dd5eba8d0f9206fbda44de717c88",
	  "timestamp": 1660801715431
	  }
	  }
	  - and then the latter could be requiring signing
	  */
	#[allow(unused_variables)]
	fn handle_subscribe(&mut self, topics: AHashSet<Topic>) -> Result<Vec<tungstenite::Message>, WsError>;

	/// Called when the [WsConnection] received a JSON-RPC value, returns messages to be sent to the server or the content with parsed event name. If not the desired content and no respose is to be sent (like after a confirmation for a subscription), return a Response with an empty Vec.
	#[allow(unused_variables)]
	fn handle_jrpc(&mut self, jrpc: serde_json::Value) -> Result<ResponseOrContent, WsError>;
	//A: use this iff spot&&perp binance accept listen-key refresh through stream
	///// Additional POST communication with the exchange, not conditional on received messages, can be handled here.
	///// Really this is just for damn Binance with their stupid `listn-key` standard.
	//fn handle_post(&mut self) -> Result<Option<Vec<tungstenite::Message>>, WsError> {
	//	Ok(None)
	//}

	//#[allow(unused_variables)]
	//fn handle_jrpc(&mut self, jrpc: &serde_json::Value) -> Result<Option<Vec<tungstenite::Message>>, WsError> {
	//	Ok(None)
	//}
}

#[derive(Clone, Debug)]
pub enum ResponseOrContent {
	/// Response to a message sent to the server.
	Response(Vec<tungstenite::Message>),
	/// Content received from the server.
	Content(ContentEvent),
}
#[derive(Clone, Debug)]
pub struct ContentEvent {
	pub data: serde_json::Value,
	pub topic: String,
	pub time: Timestamp,
	pub event_type: String,
}

#[derive(Clone, Debug, Eq)]
pub struct TopicInterpreter<T> {
	/// Only one interpreter for this name is allowed to exist // enforced through `Hash` impl defined over `event_name` only
	pub event_name: String,
	/// When name matches, interpretation should succeed.
	pub interpret: fn(&serde_json::Value) -> Result<T, WsError>,
}
impl<T> std::hash::Hash for TopicInterpreter<T> {
	fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
		self.event_name.hash(state);
	}
}
impl<T> PartialEq for TopicInterpreter<T> {
	fn eq(&self, other: &Self) -> bool {
		self.event_name == other.event_name
	}
}

/// Main way to interact with the WebSocket APIs.
#[derive(Debug)]
pub struct WsConnection<H: WsHandler> {
	url: Url,
	config: WsConfig,
	handler: H,
	stream: Option<WsConnectionStream>,
	backoff: ExponentialBackoff,
}
impl<H: WsHandler> WsConnection<H> {
	#[allow(missing_docs)]
	pub fn try_new(url_suffix: &str, handler: H) -> Result<Self, WsError> {
		let config = handler.config()?;
		let url = match &config.base_url {
			Some(base_url) => base_url.join(url_suffix).map_err(UrlError::Parse)?,
			None => Url::parse(url_suffix).map_err(UrlError::Parse)?,
		};
		let backoff = ExponentialBackoff::try_from(&config.reconnect).map_err(|e| WsError::Other(eyre::eyre!("Invalid reconnect backoff configuration: {e}")))?;

		Ok(Self {
			url,
			config,
			handler,
			stream: None,
			backoff,
		})
	}

	/// The main interface. All ws operations are hidden, only thing getting through are the content messages or the lack thereof.
	pub async fn next(&mut self) -> Result<ContentEvent, WsError> {
		if let Some(inner) = &self.stream
			&& inner.connected_since + self.config.refresh_after < SystemTime::now()
		{
			tracing::info!("Refreshing connection, as `refresh_after` specified in WsConfig has elapsed ({:?})", self.config.refresh_after);
			self.reconnect().await?;
		}
		if self.stream.is_none() {
			self.connect().await?;
		}
		//- at this point self.inner is Some

		// loop until we get actual content
		let json_rpc_value = loop {
			// force a response out of the server.
			let resp = {
				let timeout = match self.stream.as_ref() {
					Some(stream) => match stream.last_unanswered_communication {
						Some(last_unanswered) => {
							let now = SystemTime::now();
							match last_unanswered + self.config.response_timeout > now {
								true => self.config.response_timeout,
								false => {
									tracing::error!(
										"Timeout for last unanswered communication ended before `.next()` was called. This likely indicates an implementation error on the clientside."
									);
									self.reconnect().await?;
									continue;
								}
							}
						}
						None => self.config.message_timeout,
					},
					None => {
						tracing::error!(
							"UNEXPECTED: Stream is None at ws.rs:172 despite guard at line 163. \
							Possible causes: (1) system hibernation/sleep caused stale state, \
							(2) memory corruption, (3) logic bug in reconnection flow, \
							(4) async cancellation. \
							Backoff current delay: {:?}. Attempting to reconnect...",
							self.backoff.current_delay()
						);
						self.connect().await?;
						continue;
					}
				};

				let timeout_handle = tokio::time::timeout(timeout, {
					let stream = self.stream.as_mut().unwrap();
					stream.next()
				});
				match timeout_handle.await {
					Ok(Some(resp)) => {
						self.stream.as_mut().unwrap().last_unanswered_communication = None;
						resp
					}
					Ok(None) => {
						tracing::warn!("tungstenite couldn't read from the stream. Restarting.");
						self.reconnect().await?;
						continue;
					}
					Err(timeout_error) => {
						tracing::warn!("Message reception timed out after {timeout:?} seconds. // {timeout_error}");
						{
							let stream = self.stream.as_mut().unwrap();
							match stream.last_unanswered_communication.is_some() {
								true => self.reconnect().await?,
								false => {
									// Reached standard message_timeout (one for messages sent when we're not forcing communication). So let's force it.
									self.send(tungstenite::Message::Ping(Bytes::default())).await?;
									continue;
								}
							}
						}
						continue;
					}
				}
			};

			// some response received, handle it
			match resp {
				Ok(succ_resp) => match succ_resp {
					tungstenite::Message::Text(text) => {
						let value: serde_json::Value =
							serde_json::from_str(&text).expect("API sent invalid JSON, which is completely unexpected. Disappointment is immeasurable and the day is ruined.");
						tracing::trace!("{value:#?}"); // only log it after the `handle_message` has ran, as we're assuming that if it takes any actions, it will handle logging itself. (and that will likely be at a different level of important too)
						break match { self.handler.handle_jrpc(value)? } {
							ResponseOrContent::Response(messages) => {
								self.send_all(messages).await?;
								continue; // only need to send responses when it's not yet the desired content.
							}
							ResponseOrContent::Content(content) => content,
						};
					}
					tungstenite::Message::Binary(_) => {
						panic!("Received binary. But exchanges are not smart enough to send this, what is happening");
					}
					tungstenite::Message::Ping(bytes) => {
						self.send(tungstenite::Message::Pong(bytes)).await?; // Binance specifically requires the exact ping's payload to be returned here: https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams
						tracing::debug!("ponged");
						continue;
					}
					// in most cases these are not seen, as it's sufficient to just answer to their [pings](tungstenite::Message::Ping). Our own pings are sent only when we haven't heard from the exchange for a while, in which case it's likely that it will not [pong](tungstenite::Message::Pong) back either.
					tungstenite::Message::Pong(_) => {
						tracing::info!("Received pong");
						continue;
					}
					tungstenite::Message::Close(maybe_reason) => {
						match maybe_reason {
							Some(close_frame) => {
								//Q: maybe need to expose def of this for ind exchanges (so we can interpret the codes)
								tracing::info!("Server closed connection; reason: {close_frame:?}");
							}
							None => {
								tracing::info!("Server closed connection; no reason specified.");
							}
						}
						self.stream = None;
						self.reconnect().await?;
						continue;
					}
					tungstenite::Message::Frame(_) => {
						unreachable!("Can't get from reading");
					}
				},
				Err(err) => match err {
					tungstenite::Error::ConnectionClosed => {
						tracing::error!("received `tungstenite::Error::ConnectionClosed` on polling. Will reconnect");
						self.stream = None;
						continue;
					}
					tungstenite::Error::AlreadyClosed => {
						tracing::error!("received `tungstenite::Error::AlreadyClosed` from polling. Will reconnect");
						self.stream = None;
						continue;
					}
					tungstenite::Error::Io(e) => {
						tracing::error!("received `tungstenite::Error::Io` from polling: {e:?}. Atm don't know valid cases of this happening given intact application state...");
						self.stream = None;
						continue;
					}
					tungstenite::Error::Tls(_tls_error) => todo!(),
					tungstenite::Error::Capacity(capacity_error) => {
						tracing::warn!("received `tungstenite::Error::Capacity` from polling: {capacity_error:?}. Skipping.");
						continue;
					}
					tungstenite::Error::Protocol(protocol_error) => {
						tracing::warn!("received `tungstenite::Error::Protocol` from polling: {protocol_error:?}. Will reconnect");
						self.stream = None;
						continue;
					}
					tungstenite::Error::WriteBufferFull(_) => unreachable!("can only get from writing"),
					tungstenite::Error::Utf8(e) => panic!("received `tungstenite::Error::Utf8` from polling: {e:?}. Exchange is going crazy, aborting"),
					tungstenite::Error::AttackAttempt => {
						tracing::warn!("received `tungstenite::Error::AttackAttempt` from polling. Don't have a reason to trust detection 100%, so just reconnecting.");
						self.stream = None;
						continue;
					}
					tungstenite::Error::Url(_url_error) => todo!(),
					tungstenite::Error::Http(_response) => todo!(),
					tungstenite::Error::HttpFormat(_error) => todo!(),
				},
			}
		};
		Ok(json_rpc_value)
	}

	#[instrument(skip_all)]
	async fn send_all(&mut self, messages: Vec<tungstenite::Message>) -> Result<(), tungstenite::Error> {
		if let Some(inner) = &mut self.stream {
			match messages.len() {
				0 => return Ok(()),
				1 => {
					tracing::debug!("sending to server: {:#?}", &messages[0]);
					inner.send(messages.into_iter().next().unwrap()).await?;
					inner.last_unanswered_communication = Some(SystemTime::now());
				}
				_ => {
					tracing::debug!("sending to server: {messages:#?}");
					let mut message_stream = futures_util::stream::iter(messages).map(Ok);
					inner.send_all(&mut message_stream).await?;
					inner.last_unanswered_communication = Some(SystemTime::now());
				}
			};
			Ok(())
		} else {
			Err(tungstenite::Error::ConnectionClosed)
		}
	}

	async fn send(&mut self, message: tungstenite::Message) -> Result<(), tungstenite::Error> {
		self.send_all(vec![message]).await // Vec cost is negligible
	}

	async fn connect(&mut self) -> Result<(), WsError> {
		tracing::info!("Connecting to {}...", self.url);
		let delay = self.backoff.next_duration();
		if !delay.is_zero() {
			tracing::warn!(delay_ms = delay.as_millis(), "Reconnect backoff active. Likely indicative of a bad connection.");
			tokio::time::sleep(delay).await;
		}

		let (stream, http_resp) = tokio_tungstenite::connect_async(self.url.as_str()).await?;
		tracing::debug!("Ws handshake with server: {http_resp:#?}");

		let now = SystemTime::now();
		self.stream = Some(WsConnectionStream::new(stream, now));

		let auth_messages = self.handler.handle_auth()?;
		self.send_all(auth_messages).await?;
		self.backoff.reset();
		Ok(())
	}

	/// Sends the existing connection (if any) a `Close` message, and then simply drops it, opening a new one.
	///
	/// `pub` for testing only, does not {have to || is expected to} be exposed in any wrappers.
	pub async fn reconnect(&mut self) -> Result<(), WsError> {
		if let Some(stream) = self.stream.as_mut() {
			tracing::info!("Dropping old connection before reconnecting...");
			// Best-effort close - ignore errors since the connection may already be broken
			if let Err(e) = stream.send(tungstenite::Message::Close(None)).await {
				tracing::debug!("Failed to send Close frame (connection likely already dead): {e}");
			}
			self.stream = None;
		}
		self.connect().await
	}
}

/// Configuration for [WsHandler].
///
/// Should be returned by [WsHandler::ws_config()].
#[derive(Clone, Debug)]
pub struct WsConfig {
	/// Whether the connection should be authenticated. Normally implemented through a "listen key"
	pub auth: bool,
	/// Prefix which will be used for connections that started using this `WebSocketConfig`.
	///
	/// Ex: `"wss://example.com"`
	pub base_url: Option<Url>,
	/// Backoff configuration for reconnect attempts.
	pub reconnect: RetryConfig,
	/// The [WebSocketConnection] will automatically reconnect when `refresh_after` has elapsed since the last connection started.
	refresh_after: Duration,
	/// A reconnection will be triggered if no messages are received within this amount of time.
	message_timeout: Duration,
	/// Timeout for the response to a message sent to the server.
	///
	/// Difference from the [message_timeout](Self::message_timeout) is that here we directly request communication. Eg: sending a Ping or attempting to auth.
	response_timeout: Duration,
	/// The topics that will be subscribed to on creation of the connection. Note that we don't allow for passing anything that changes state here like [Trade](Topic::Trade) payloads, thus submissions are limited to [String]s
	pub topics: AHashSet<String>,
}
impl WsConfig {
	pub fn set_reconnect(&mut self, reconnect: RetryConfig) {
		self.reconnect = reconnect;
	}

	pub fn set_refresh_after(&mut self, refresh_after: Duration) -> Result<()> {
		if refresh_after.is_zero() {
			bail!("refresh_after must be greater than 0");
		}
		self.refresh_after = refresh_after;
		Ok(())
	}

	pub fn set_message_timeout(&mut self, message_timeout: Duration) -> Result<()> {
		if message_timeout.is_zero() {
			bail!("message_timeout must be greater than 0");
		}
		self.message_timeout = message_timeout;
		Ok(())
	}

	pub fn set_response_timout(&mut self, response_timeout: Duration) -> Result<()> {
		if response_timeout.is_zero() {
			bail!("response_timeout must be greater than 0");
		}
		self.response_timeout = response_timeout;
		Ok(())
	}
}

#[derive(Debug, miette::Diagnostic, derive_more::Display, thiserror::Error, derive_more::From)]
pub enum WsError {
	#[diagnostic(transparent)]
	Definition(WsDefinitionError),
	#[diagnostic(code(v_exchanges::ws::tungstenite), help("WebSocket protocol error. The connection may need to be reestablished."))]
	Tungstenite(tungstenite::Error),
	#[diagnostic(transparent)]
	Auth(ConstructAuthError),
	#[diagnostic(code(v_exchanges::ws::parse), help("Failed to parse WebSocket message. Check if the exchange API has changed."))]
	Parse(serde_json::Error),
	#[diagnostic(code(v_exchanges::ws::subscription))]
	Subscription(String),
	#[diagnostic(code(v_exchanges::ws::network), help("Network connection failed. Check your internet connection."))]
	NetworkConnection,
	#[diagnostic(transparent)]
	Url(UrlError),
	#[diagnostic(code(v_exchanges::ws::unexpected_event), help("Received an unexpected event from the WebSocket. This may indicate an API change."))]
	UnexpectedEvent(serde_json::Value),
	#[error(transparent)]
	Other(eyre::Report),
}
#[derive(Debug, miette::Diagnostic, derive_more::Display, thiserror::Error)]
pub enum WsDefinitionError {
	#[diagnostic(code(v_exchanges::ws::definition::missing_url), help("WebSocket base URL must be configured in WsConfig."))]
	MissingUrl,
}
#[derive(Clone, Debug, derive_more::Display, Eq, Hash, PartialEq, serde::Serialize)]
pub enum Topic {
	String(String),
	Order(serde_json::Value),
}
#[derive(Debug, derive_more::Deref, derive_more::DerefMut, derive_new::new)]
struct WsConnectionStream {
	#[deref_mut]
	#[deref]
	stream: WsStream,
	connected_since: SystemTime,
	#[new(default)]
	last_unanswered_communication: Option<SystemTime>,
}

impl Default for WsConfig {
	fn default() -> Self {
		Self {
			auth: false,
			base_url: None,
			reconnect: RetryConfig {
				max_retries: u32::MAX,
				initial_delay_ms: 1_000,
				max_delay_ms: 30_000,
				backoff_factor: 2.0,
				jitter_ms: 500,
				immediate_first: false,
				max_elapsed_ms: None,
			},
			refresh_after: Duration::from_hours(12),
			message_timeout: Duration::from_mins(16),
			response_timeout: Duration::from_mins(2),
			topics: AHashSet::new(),
		}
	}
}

//DEPRECATE: or reinstate, - can't even remember what's this now
//#[derive(Debug, derive_more::Display, thiserror::Error)]
//pub enum SubscriptionError {
//	Topic(String),
//	Params(serde_json::Value),
//	Incompatible(IncompatibleSubscriptionError),
//}
//#[derive(Debug, thiserror::Error)]
//#[error("Incompatible subscription error: could not subscribe to {topic:#?} on {base_url}")]
//pub struct IncompatibleSubscriptionError {
//	topic: Topic,
//	base_url: Url,
//}