use std::{
collections::{hash_map::Entry, HashMap},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use async_trait::async_trait;
use futures03::{stream::SplitSink, SinkExt, StreamExt};
use hyper::{
header::{
AUTHORIZATION, CONNECTION, HOST, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE,
USER_AGENT,
},
Uri,
};
#[cfg(test)]
use mockall::automock;
use thiserror::Error;
use tokio::{
net::TcpStream,
sync::{
mpsc::{self, error::TrySendError, Receiver, Sender},
oneshot, Mutex, MutexGuard, Notify,
},
task::JoinHandle,
time::sleep,
};
use tokio_tungstenite::{
connect_async,
tungstenite::{
self,
handshake::client::{generate_key, Request},
},
MaybeTlsStream, WebSocketStream,
};
use tracing::{debug, error, info, instrument, trace, warn};
use tycho_common::dto::{
BlockChanges, Command, ExtractorIdentity, Response, WebSocketMessage, WebsocketError,
};
use uuid::Uuid;
use zstd;
use crate::TYCHO_SERVER_VERSION;
#[derive(Error, Debug)]
pub enum DeltasError {
#[error("Failed to parse URI: {0}. Error: {1}")]
UriParsing(String, String),
#[error("The requested subscription is already pending")]
SubscriptionAlreadyPending,
#[error("The server replied with an error: {0}")]
ServerError(String, #[source] WebsocketError),
#[error("{0}")]
TransportError(String),
#[error("The buffer is full!")]
BufferFull,
#[error("The client is not connected!")]
NotConnected,
#[error("The client is already connected!")]
AlreadyConnected,
#[error("The server closed the connection!")]
ConnectionClosed,
#[error("Connection error: {0}")]
ConnectionError(#[from] Box<tungstenite::Error>),
#[error("Tycho FatalError: {0}")]
Fatal(String),
}
#[derive(Clone, Debug)]
pub struct SubscriptionOptions {
include_state: bool,
compression: bool,
partial_blocks: bool,
}
impl Default for SubscriptionOptions {
fn default() -> Self {
Self { include_state: true, compression: true, partial_blocks: false }
}
}
impl SubscriptionOptions {
pub fn new() -> Self {
Self::default()
}
pub fn with_state(mut self, val: bool) -> Self {
self.include_state = val;
self
}
pub fn with_compression(mut self, val: bool) -> Self {
self.compression = val;
self
}
pub fn with_partial_blocks(mut self, val: bool) -> Self {
self.partial_blocks = val;
self
}
}
#[cfg_attr(test, automock)]
#[async_trait]
pub trait DeltasClient {
async fn subscribe(
&self,
extractor_id: ExtractorIdentity,
options: SubscriptionOptions,
) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError>;
async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError>;
async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError>;
async fn close(&self) -> Result<(), DeltasError>;
}
#[derive(Clone)]
pub struct WsDeltasClient {
uri: Uri,
auth_key: Option<String>,
max_reconnects: u64,
retry_cooldown: Duration,
ws_buffer_size: usize,
subscription_buffer_size: usize,
conn_notify: Arc<Notify>,
inner: Arc<Mutex<Option<Inner>>>,
dead: Arc<AtomicBool>,
}
type WebSocketSink =
SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::protocol::Message>;
#[derive(Debug)]
enum SubscriptionInfo {
RequestedSubscription(oneshot::Sender<Result<(Uuid, Receiver<BlockChanges>), DeltasError>>),
Active,
RequestedUnsubscription(oneshot::Sender<()>),
}
struct Inner {
sink: WebSocketSink,
cmd_tx: Sender<()>,
pending: HashMap<ExtractorIdentity, SubscriptionInfo>,
subscriptions: HashMap<Uuid, SubscriptionInfo>,
sender: HashMap<Uuid, Sender<BlockChanges>>,
buffer_size: usize,
}
impl Inner {
fn new(cmd_tx: Sender<()>, sink: WebSocketSink, buffer_size: usize) -> Self {
Self {
sink,
cmd_tx,
pending: HashMap::new(),
subscriptions: HashMap::new(),
sender: HashMap::new(),
buffer_size,
}
}
#[allow(clippy::result_large_err)]
fn new_subscription(
&mut self,
id: &ExtractorIdentity,
ready_tx: oneshot::Sender<Result<(Uuid, Receiver<BlockChanges>), DeltasError>>,
) -> Result<(), DeltasError> {
if self.pending.contains_key(id) {
return Err(DeltasError::SubscriptionAlreadyPending);
}
self.pending
.insert(id.clone(), SubscriptionInfo::RequestedSubscription(ready_tx));
Ok(())
}
fn mark_active(&mut self, extractor_id: &ExtractorIdentity, subscription_id: Uuid) {
if let Some(info) = self.pending.remove(extractor_id) {
if let SubscriptionInfo::RequestedSubscription(ready_tx) = info {
let (tx, rx) = mpsc::channel(self.buffer_size);
self.sender.insert(subscription_id, tx);
self.subscriptions
.insert(subscription_id, SubscriptionInfo::Active);
let _ = ready_tx
.send(Ok((subscription_id, rx)))
.map_err(|_| {
warn!(
?extractor_id,
?subscription_id,
"Subscriber for has gone away. Ignoring."
)
});
} else {
error!(
?extractor_id,
?subscription_id,
"Pending subscription was not in the correct state to
transition to active. Ignoring!"
)
}
} else {
error!(
?extractor_id,
?subscription_id,
"Tried to mark an unknown subscription as active. Ignoring!"
);
}
}
#[allow(clippy::result_large_err)]
fn send(&mut self, id: &Uuid, msg: BlockChanges) -> Result<(), DeltasError> {
if let Some(sender) = self.sender.get_mut(id) {
sender
.try_send(msg)
.map_err(|e| match e {
TrySendError::Full(_) => DeltasError::BufferFull,
TrySendError::Closed(_) => {
DeltasError::TransportError("The subscriber has gone away".to_string())
}
})?;
}
Ok(())
}
fn end_subscription(&mut self, subscription_id: &Uuid, ready_tx: oneshot::Sender<()>) {
if let Some(info) = self
.subscriptions
.get_mut(subscription_id)
{
if let SubscriptionInfo::Active = info {
*info = SubscriptionInfo::RequestedUnsubscription(ready_tx);
}
} else {
debug!(?subscription_id, "Tried unsubscribing from a non existent subscription");
}
}
fn remove_subscription(&mut self, subscription_id: Uuid) -> Result<(), DeltasError> {
if let Entry::Occupied(e) = self
.subscriptions
.entry(subscription_id)
{
let info = e.remove();
if let SubscriptionInfo::RequestedUnsubscription(tx) = info {
let _ = tx.send(()).map_err(|_| {
debug!(?subscription_id, "failed to notify about removed subscription")
});
self.sender
.remove(&subscription_id)
.ok_or_else(|| DeltasError::Fatal("Inconsistent internal client state: `sender` state drifted from `info` while removing a subscription.".to_string()))?;
} else {
warn!(?subscription_id, "Subscription ended unexpectedly!");
self.sender
.remove(&subscription_id)
.ok_or_else(|| DeltasError::Fatal("sender channel missing".to_string()))?;
}
} else {
trace!(
?subscription_id,
"Received `SubscriptionEnded`, but was never subscribed to it. This is likely a bug!"
);
}
Ok(())
}
fn cancel_pending(&mut self, extractor_id: &ExtractorIdentity, error: &WebsocketError) {
if let Some(sub_info) = self.pending.remove(extractor_id) {
match sub_info {
SubscriptionInfo::RequestedSubscription(tx) => {
let _ = tx
.send(Err(DeltasError::ServerError(
format!("Subscription failed: {error}"),
error.clone(),
)))
.map_err(|_| debug!("Cancel pending failed: receiver deallocated!"));
}
_ => {
error!(?extractor_id, "Pending subscription in wrong state")
}
}
} else {
debug!(?extractor_id, "Tried cancel on non-existent pending subscription!")
}
}
async fn ws_send(&mut self, msg: tungstenite::protocol::Message) -> Result<(), DeltasError> {
self.sink.send(msg).await.map_err(|e| {
DeltasError::TransportError(format!("Failed to send message to websocket: {e}"))
})
}
}
impl WsDeltasClient {
#[allow(clippy::result_large_err)]
pub fn new(ws_uri: &str, auth_key: Option<&str>) -> Result<Self, DeltasError> {
let uri = ws_uri
.parse::<Uri>()
.map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
Ok(Self {
uri,
auth_key: auth_key.map(|s| s.to_string()),
inner: Arc::new(Mutex::new(None)),
ws_buffer_size: 128,
subscription_buffer_size: 128,
conn_notify: Arc::new(Notify::new()),
max_reconnects: 5,
retry_cooldown: Duration::from_millis(500),
dead: Arc::new(AtomicBool::new(false)),
})
}
#[allow(clippy::result_large_err)]
pub fn new_with_reconnects(
ws_uri: &str,
auth_key: Option<&str>,
max_reconnects: u64,
retry_cooldown: Duration,
) -> Result<Self, DeltasError> {
let uri = ws_uri
.parse::<Uri>()
.map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
Ok(Self {
uri,
auth_key: auth_key.map(|s| s.to_string()),
inner: Arc::new(Mutex::new(None)),
ws_buffer_size: 128,
subscription_buffer_size: 128,
conn_notify: Arc::new(Notify::new()),
max_reconnects,
retry_cooldown,
dead: Arc::new(AtomicBool::new(false)),
})
}
#[cfg(test)]
#[allow(clippy::result_large_err)]
pub fn new_with_custom_buffers(
ws_uri: &str,
auth_key: Option<&str>,
ws_buffer_size: usize,
subscription_buffer_size: usize,
) -> Result<Self, DeltasError> {
let uri = ws_uri
.parse::<Uri>()
.map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
Ok(Self {
uri,
auth_key: auth_key.map(|s| s.to_string()),
inner: Arc::new(Mutex::new(None)),
ws_buffer_size,
subscription_buffer_size,
conn_notify: Arc::new(Notify::new()),
max_reconnects: 5,
retry_cooldown: Duration::from_millis(0),
dead: Arc::new(AtomicBool::new(false)),
})
}
async fn is_connected(&self) -> bool {
let guard = self.inner.as_ref().lock().await;
guard.is_some()
}
async fn ensure_connection(&self) -> Result<(), DeltasError> {
if self.dead.load(Ordering::SeqCst) {
return Err(DeltasError::NotConnected)
};
if !self.is_connected().await {
self.conn_notify.notified().await;
};
Ok(())
}
#[instrument(skip(self, msg))]
async fn handle_msg(
&self,
msg: Result<tungstenite::protocol::Message, tokio_tungstenite::tungstenite::error::Error>,
) -> Result<(), DeltasError> {
let mut guard = self.inner.lock().await;
match msg {
Ok(tungstenite::protocol::Message::Text(text)) => match serde_json::from_str::<
serde_json::Value,
>(&text)
{
Ok(value) => match serde_json::from_value::<WebSocketMessage>(value) {
Ok(ws_message) => match ws_message {
WebSocketMessage::BlockChanges { subscription_id, deltas } => {
Self::handle_block_changes_msg(&mut guard, subscription_id, deltas).await?;
}
WebSocketMessage::Response(Response::NewSubscription {
extractor_id,
subscription_id,
}) => {
info!(?extractor_id, ?subscription_id, "Received a new subscription");
let inner = guard
.as_mut()
.ok_or_else(|| DeltasError::NotConnected)?;
inner.mark_active(&extractor_id, subscription_id);
}
WebSocketMessage::Response(Response::SubscriptionEnded {
subscription_id,
}) => {
info!(?subscription_id, "Received a subscription ended");
let inner = guard
.as_mut()
.ok_or_else(|| DeltasError::NotConnected)?;
inner.remove_subscription(subscription_id)?;
}
WebSocketMessage::Response(Response::Error(error)) => match &error {
WebsocketError::ExtractorNotFound(extractor_id) => {
let inner = guard
.as_mut()
.ok_or_else(|| DeltasError::NotConnected)?;
inner.cancel_pending(extractor_id, &error);
}
WebsocketError::SubscriptionNotFound(subscription_id) => {
debug!("Received subscription not found, removing subscription");
let inner = guard
.as_mut()
.ok_or_else(|| DeltasError::NotConnected)?;
inner.remove_subscription(*subscription_id)?;
}
WebsocketError::ParseError(raw, e) => {
return Err(DeltasError::ServerError(
format!(
"Server failed to parse client message: {e}, msg: {raw}"
),
error.clone(),
))
}
WebsocketError::CompressionError(subscription_id, e) => {
return Err(DeltasError::ServerError(
format!(
"Server failed to compress message for subscription: {subscription_id}, error: {e}"
),
error.clone(),
))
}
WebsocketError::SubscribeError(extractor_id) => {
let inner = guard
.as_mut()
.ok_or_else(|| DeltasError::NotConnected)?;
inner.cancel_pending(extractor_id, &error);
}
},
},
Err(e) => {
error!(
"Failed to deserialize WebSocketMessage: {}. \nMessage: {}",
e, text
);
}
},
Err(e) => {
error!(
"Failed to deserialize message: invalid JSON. {} \nMessage: {}",
e, text
);
}
},
Ok(tungstenite::protocol::Message::Binary(data)) => {
match zstd::decode_all(data.as_slice()) {
Ok(decompressed) => match serde_json::from_slice::<serde_json::Value>(decompressed.as_slice()) {
Ok(value) => match serde_json::from_value::<WebSocketMessage>(value.clone()) {
Ok(ws_message) => match ws_message {
WebSocketMessage::BlockChanges { subscription_id, deltas } => {
Self::handle_block_changes_msg(&mut guard, subscription_id, deltas).await?;
}
_ => {
error!(
"Received unsupported compressed WebSocketMessage variant. \nMessage: {ws_message:?}",
);
}
},
Err(e) => {
error!(
"Failed to deserialize compressed WebSocketMessage: {e}. \nMessage: {value:?}",
);
}
},
Err(e) => {
error!(
"Failed to deserialize compressed message: invalid JSON. {e}",
);
}
},
Err(e) => {
error!("Failed to decompress zstd data: {}", e);
}
}
},
Ok(tungstenite::protocol::Message::Ping(_)) => {
let inner = guard
.as_mut()
.ok_or_else(|| DeltasError::NotConnected)?;
if let Err(error) = inner
.ws_send(tungstenite::protocol::Message::Pong(Vec::new()))
.await
{
debug!(?error, "Failed to send pong!");
}
}
Ok(tungstenite::protocol::Message::Pong(_)) => {
}
Ok(tungstenite::protocol::Message::Close(_)) => {
return Err(DeltasError::ConnectionClosed);
}
Ok(unknown_msg) => {
info!("Received an unknown message type: {:?}", unknown_msg);
}
Err(error) => {
error!(?error, "Websocket error");
return Err(match error {
tungstenite::Error::ConnectionClosed => DeltasError::ConnectionClosed,
tungstenite::Error::AlreadyClosed => {
warn!("Received AlreadyClosed error which is indicative of a bug!");
DeltasError::ConnectionError(Box::new(error))
}
tungstenite::Error::Io(_) | tungstenite::Error::Protocol(_) => {
DeltasError::ConnectionError(Box::new(error))
}
_ => DeltasError::Fatal(error.to_string()),
});
}
};
Ok(())
}
async fn handle_block_changes_msg(
guard: &mut MutexGuard<'_, Option<Inner>>,
subscription_id: Uuid,
deltas: BlockChanges,
) -> Result<(), DeltasError> {
trace!(?deltas, "Received a block state change, sending to channel");
let inner = guard
.as_mut()
.ok_or_else(|| DeltasError::NotConnected)?;
match inner.send(&subscription_id, deltas) {
Err(DeltasError::BufferFull) => {
error!(?subscription_id, "Buffer full, unsubscribing!");
Self::force_unsubscribe(subscription_id, inner).await;
}
Err(_) => {
warn!(?subscription_id, "Receiver for has gone away, unsubscribing!");
Self::force_unsubscribe(subscription_id, inner).await;
}
_ => { }
}
Ok(())
}
async fn force_unsubscribe(subscription_id: Uuid, inner: &mut Inner) {
if let Some(SubscriptionInfo::RequestedUnsubscription(_)) = inner
.subscriptions
.get(&subscription_id)
{
return
}
let (tx, rx) = oneshot::channel();
if let Err(e) = WsDeltasClient::unsubscribe_inner(inner, subscription_id, tx).await {
warn!(?e, ?subscription_id, "Failed to send unsubscribe command");
} else {
match tokio::time::timeout(Duration::from_secs(5), rx).await {
Ok(_) => {
debug!(?subscription_id, "Unsubscribe completed successfully");
}
Err(_) => {
warn!(?subscription_id, "Unsubscribe completion timed out");
}
}
}
}
async fn unsubscribe_inner(
inner: &mut Inner,
subscription_id: Uuid,
ready_tx: oneshot::Sender<()>,
) -> Result<(), DeltasError> {
debug!(?subscription_id, "Unsubscribing");
inner.end_subscription(&subscription_id, ready_tx);
let cmd = Command::Unsubscribe { subscription_id };
inner
.ws_send(tungstenite::protocol::Message::Text(serde_json::to_string(&cmd).map_err(
|e| {
DeltasError::TransportError(format!(
"Failed to serialize unsubscribe command: {e}"
))
},
)?))
.await?;
Ok(())
}
}
#[async_trait]
impl DeltasClient for WsDeltasClient {
#[instrument(skip(self))]
async fn subscribe(
&self,
extractor_id: ExtractorIdentity,
options: SubscriptionOptions,
) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
trace!("Starting subscribe");
self.ensure_connection().await?;
let (ready_tx, ready_rx) = oneshot::channel();
{
let mut guard = self.inner.lock().await;
let inner = guard
.as_mut()
.ok_or_else(|| DeltasError::NotConnected)?;
trace!("Sending subscribe command");
inner.new_subscription(&extractor_id, ready_tx)?;
let cmd = Command::Subscribe {
extractor_id,
include_state: options.include_state,
compression: options.compression,
partial_blocks: options.partial_blocks,
};
inner
.ws_send(tungstenite::protocol::Message::Text(
serde_json::to_string(&cmd).map_err(|e| {
DeltasError::TransportError(format!(
"Failed to serialize subscribe command: {e}"
))
})?,
))
.await?;
}
trace!("Waiting for subscription response");
let res = ready_rx.await.map_err(|_| {
DeltasError::TransportError("Subscription channel closed unexpectedly".to_string())
})??;
trace!("Subscription successful");
Ok(res)
}
#[instrument(skip(self))]
async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
self.ensure_connection().await?;
let (ready_tx, ready_rx) = oneshot::channel();
{
let mut guard = self.inner.lock().await;
let inner = guard
.as_mut()
.ok_or_else(|| DeltasError::NotConnected)?;
WsDeltasClient::unsubscribe_inner(inner, subscription_id, ready_tx).await?;
}
ready_rx.await.map_err(|_| {
DeltasError::TransportError("Unsubscribe channel closed unexpectedly".to_string())
})?;
Ok(())
}
#[instrument(skip(self))]
async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
if self.is_connected().await {
return Err(DeltasError::AlreadyConnected);
}
let ws_uri = format!("{uri}{TYCHO_SERVER_VERSION}/ws", uri = self.uri);
info!(?ws_uri, "Starting TychoWebsocketClient");
let (cmd_tx, mut cmd_rx) = mpsc::channel(self.ws_buffer_size);
{
let mut guard = self.inner.as_ref().lock().await;
*guard = None;
}
let this = self.clone();
let jh = tokio::spawn(async move {
let mut retry_count = 0;
let mut result = Err(DeltasError::NotConnected);
'retry: while retry_count < this.max_reconnects {
info!(?ws_uri, retry_count, "Connecting to WebSocket server");
if retry_count > 0 {
sleep(this.retry_cooldown).await;
}
let mut request_builder = Request::builder()
.uri(&ws_uri)
.header(SEC_WEBSOCKET_KEY, generate_key())
.header(SEC_WEBSOCKET_VERSION, 13)
.header(CONNECTION, "Upgrade")
.header(UPGRADE, "websocket")
.header(
HOST,
this.uri.host().ok_or_else(|| {
DeltasError::UriParsing(
ws_uri.clone(),
"No host found in tycho url".to_string(),
)
})?,
)
.header(
USER_AGENT,
format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION")),
);
if let Some(ref key) = this.auth_key {
request_builder = request_builder.header(AUTHORIZATION, key);
}
let request = request_builder.body(()).map_err(|e| {
DeltasError::TransportError(format!("Failed to build connection request: {e}"))
})?;
let (conn, _) = match connect_async(request).await {
Ok(conn) => conn,
Err(e) => {
retry_count += 1;
let mut guard = this.inner.as_ref().lock().await;
*guard = None;
warn!(
e = e.to_string(),
"Failed to connect to WebSocket server; Reconnecting"
);
continue 'retry;
}
};
let (ws_tx_new, ws_rx_new) = conn.split();
{
let mut guard = this.inner.as_ref().lock().await;
*guard =
Some(Inner::new(cmd_tx.clone(), ws_tx_new, this.subscription_buffer_size));
}
let mut msg_rx = ws_rx_new.boxed();
info!("Connection Successful: TychoWebsocketClient started");
this.conn_notify.notify_waiters();
result = Ok(());
loop {
let res = tokio::select! {
msg = msg_rx.next() => match msg {
Some(msg) => this.handle_msg(msg).await,
None => {
warn!("Websocket connection silently closed, giving up!");
break 'retry
}
},
_ = cmd_rx.recv() => {break 'retry},
};
if let Err(error) = res {
debug!(?error, "WsError");
if matches!(
error,
DeltasError::ConnectionClosed | DeltasError::ConnectionError { .. }
) {
retry_count += 1;
let mut guard = this.inner.as_ref().lock().await;
*guard = None;
warn!(
?error,
?retry_count,
"Connection dropped unexpectedly; Reconnecting..."
);
break;
} else {
error!(?error, "Fatal error; Exiting");
result = Err(error);
break 'retry;
}
}
}
}
debug!(
retry_count,
max_reconnects=?this.max_reconnects,
"Reconnection loop ended"
);
let mut guard = this.inner.as_ref().lock().await;
*guard = None;
if retry_count >= this.max_reconnects {
error!("Max reconnection attempts reached; Exiting");
this.dead.store(true, Ordering::SeqCst);
this.conn_notify.notify_waiters(); result = Err(DeltasError::ConnectionClosed);
}
result
});
self.conn_notify.notified().await;
if self.is_connected().await {
Ok(jh)
} else {
Err(DeltasError::NotConnected)
}
}
#[instrument(skip(self))]
async fn close(&self) -> Result<(), DeltasError> {
info!("Closing TychoWebsocketClient");
let mut guard = self.inner.lock().await;
let inner = guard
.as_mut()
.ok_or_else(|| DeltasError::NotConnected)?;
inner
.cmd_tx
.send(())
.await
.map_err(|e| DeltasError::TransportError(e.to_string()))?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::{net::SocketAddr, str::FromStr};
use tokio::{net::TcpListener, time::timeout};
use tycho_common::dto::Chain;
use super::*;
#[derive(Clone)]
enum ExpectedComm {
Receive(u64, tungstenite::protocol::Message),
Send(tungstenite::protocol::Message),
}
async fn mock_tycho_ws(
messages: &[ExpectedComm],
reconnects: usize,
) -> (SocketAddr, JoinHandle<()>) {
info!("Starting mock webserver");
let server = TcpListener::bind("127.0.0.1:0")
.await
.expect("localhost bind failed");
let addr = server.local_addr().unwrap();
let messages = messages.to_vec();
let jh = tokio::spawn(async move {
info!("mock webserver started");
for _ in 0..(reconnects + 1) {
info!("Awaiting client connections");
if let Ok((stream, _)) = server.accept().await {
info!("Client connected");
let mut websocket = tokio_tungstenite::accept_async(stream)
.await
.unwrap();
info!("Handling messages..");
for c in messages.iter().cloned() {
match c {
ExpectedComm::Receive(t, exp) => {
info!("Awaiting message...");
let msg = timeout(Duration::from_millis(t), websocket.next())
.await
.expect("Receive timeout")
.expect("Stream exhausted")
.expect("Failed to receive message.");
info!("Message received");
assert_eq!(msg, exp)
}
ExpectedComm::Send(data) => {
info!("Sending message");
websocket
.send(data)
.await
.expect("Failed to send message");
info!("Message sent");
}
};
}
info!("Mock communication completed");
sleep(Duration::from_millis(100)).await;
let _ = websocket.close(None).await;
info!("Mock server closed connection");
}
}
info!("mock server ended");
});
(addr, jh)
}
const SUBSCRIPTION_ID: &str = "30b740d1-cf09-4e0e-8cfe-b1434d447ece";
fn subscribe() -> String {
subscribe_with_compression(false)
}
fn subscribe_with_compression(compression: bool) -> String {
serde_json::json!({
"method": "subscribe",
"extractor_id": {
"chain": "ethereum",
"name": "vm:ambient"
},
"include_state": true,
"compression": compression,
"partial_blocks": false
})
.to_string()
}
fn subscription_confirmation() -> String {
r#"
{
"method": "newsubscription",
"extractor_id":{
"chain": "ethereum",
"name": "vm:ambient"
},
"subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
}
"#
.replace(|c: char| c.is_whitespace(), "")
}
fn block_deltas() -> String {
r#"
{
"subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
"deltas": {
"extractor": "vm:ambient",
"chain": "ethereum",
"block": {
"number": 123,
"hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
"parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
"chain": "ethereum",
"ts": "2023-09-14T00:00:00"
},
"finalized_block_height": 0,
"revert": false,
"new_tokens": {},
"account_updates": {
"0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
"address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
"chain": "ethereum",
"slots": {},
"balance": "0x01f4",
"code": "",
"change": "Update"
}
},
"state_updates": {
"component_1": {
"component_id": "component_1",
"updated_attributes": {"attr1": "0x01"},
"deleted_attributes": ["attr2"]
}
},
"new_protocol_components":
{ "protocol_1": {
"id": "protocol_1",
"protocol_system": "system_1",
"protocol_type_name": "type_1",
"chain": "ethereum",
"tokens": ["0x01", "0x02"],
"contract_ids": ["0x01", "0x02"],
"static_attributes": {"attr1": "0x01f4"},
"change": "Update",
"creation_tx": "0x01",
"created_at": "2023-09-14T00:00:00"
}
},
"deleted_protocol_components": {},
"component_balances": {
"protocol_1":
{
"0x01": {
"token": "0x01",
"balance": "0x01f4",
"balance_float": 0.0,
"modify_tx": "0x01",
"component_id": "protocol_1"
}
}
},
"account_balances": {
"0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
"0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
"account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
"token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
"balance": "0x01f4",
"modify_tx": "0x01"
}
}
},
"component_tvl": {
"protocol_1": 1000.0
},
"dci_update": {
"new_entrypoints": {},
"new_entrypoint_params": {},
"trace_results": {}
}
}
}
"#.replace(|c: char| c.is_whitespace(), "")
}
fn unsubscribe() -> String {
r#"
{
"method": "unsubscribe",
"subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
}
"#
.replace(|c: char| c.is_whitespace(), "")
}
fn subscription_ended() -> String {
r#"
{
"method": "subscriptionended",
"subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
}
"#
.replace(|c: char| c.is_whitespace(), "")
}
#[tokio::test]
async fn test_uncompressed_subscribe_receive() {
let exp_comm = [
ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
ExpectedComm::Send(tungstenite::protocol::Message::Text(block_deltas())),
];
let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
let jh = client
.connect()
.await
.expect("connect failed");
let (_, mut rx) = timeout(
Duration::from_millis(100),
client.subscribe(
ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
SubscriptionOptions::new().with_compression(false),
),
)
.await
.expect("subscription timed out")
.expect("subscription failed");
let _ = timeout(Duration::from_millis(100), rx.recv())
.await
.expect("awaiting message timeout out")
.expect("receiving message failed");
timeout(Duration::from_millis(100), client.close())
.await
.expect("close timed out")
.expect("close failed");
jh.await
.expect("ws loop errored")
.unwrap();
server_thread.await.unwrap();
}
#[tokio::test]
async fn test_compressed_subscribe_receive() {
let compressed_block_deltas = zstd::encode_all(
block_deltas().as_bytes(),
0, )
.expect("Failed to compress block deltas message");
let exp_comm = [
ExpectedComm::Receive(
100,
tungstenite::protocol::Message::Text(subscribe_with_compression(true)),
),
ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
ExpectedComm::Send(tungstenite::protocol::Message::Binary(compressed_block_deltas)),
];
let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
let jh = client
.connect()
.await
.expect("connect failed");
let (_, mut rx) = timeout(
Duration::from_millis(100),
client.subscribe(
ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
SubscriptionOptions::new().with_compression(true),
),
)
.await
.expect("subscription timed out")
.expect("subscription failed");
let _ = timeout(Duration::from_millis(100), rx.recv())
.await
.expect("awaiting message timeout out")
.expect("receiving message failed");
timeout(Duration::from_millis(100), client.close())
.await
.expect("close timed out")
.expect("close failed");
jh.await
.expect("ws loop errored")
.unwrap();
server_thread.await.unwrap();
}
#[tokio::test]
async fn test_unsubscribe() {
let exp_comm = [
ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(unsubscribe())),
ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_ended())),
];
let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
let jh = client
.connect()
.await
.expect("connect failed");
let (sub_id, mut rx) = timeout(
Duration::from_millis(100),
client.subscribe(
ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
SubscriptionOptions::new().with_compression(false),
),
)
.await
.expect("subscription timed out")
.expect("subscription failed");
timeout(Duration::from_millis(100), client.unsubscribe(sub_id))
.await
.expect("unsubscribe timed out")
.expect("unsubscribe failed");
let res = timeout(Duration::from_millis(100), rx.recv())
.await
.expect("awaiting message timeout out");
assert!(res.is_none());
timeout(Duration::from_millis(100), client.close())
.await
.expect("close timed out")
.expect("close failed");
jh.await
.expect("ws loop errored")
.unwrap();
server_thread.await.unwrap();
}
#[tokio::test]
async fn test_subscription_unexpected_end() {
let exp_comm = [
ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_ended())),
];
let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
let jh = client
.connect()
.await
.expect("connect failed");
let (_, mut rx) = timeout(
Duration::from_millis(100),
client.subscribe(
ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
SubscriptionOptions::new().with_compression(false),
),
)
.await
.expect("subscription timed out")
.expect("subscription failed");
let res = timeout(Duration::from_millis(100), rx.recv())
.await
.expect("awaiting message timeout out");
assert!(res.is_none());
timeout(Duration::from_millis(100), client.close())
.await
.expect("close timed out")
.expect("close failed");
jh.await
.expect("ws loop errored")
.unwrap();
server_thread.await.unwrap();
}
#[test_log::test(tokio::test)]
async fn test_reconnect() {
let exp_comm = [
ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe()
)),
ExpectedComm::Send(tungstenite::protocol::Message::Text(
subscription_confirmation()
)),
ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
{
"subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
"deltas": {
"extractor": "vm:ambient",
"chain": "ethereum",
"block": {
"number": 123,
"hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
"parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
"chain": "ethereum",
"ts": "2023-09-14T00:00:00"
},
"finalized_block_height": 0,
"revert": false,
"new_tokens": {},
"account_updates": {
"0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
"address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
"chain": "ethereum",
"slots": {},
"balance": "0x01f4",
"code": "",
"change": "Update"
}
},
"state_updates": {
"component_1": {
"component_id": "component_1",
"updated_attributes": {"attr1": "0x01"},
"deleted_attributes": ["attr2"]
}
},
"new_protocol_components": {
"protocol_1":
{
"id": "protocol_1",
"protocol_system": "system_1",
"protocol_type_name": "type_1",
"chain": "ethereum",
"tokens": ["0x01", "0x02"],
"contract_ids": ["0x01", "0x02"],
"static_attributes": {"attr1": "0x01f4"},
"change": "Update",
"creation_tx": "0x01",
"created_at": "2023-09-14T00:00:00"
}
},
"deleted_protocol_components": {},
"component_balances": {
"protocol_1": {
"0x01": {
"token": "0x01",
"balance": "0x01f4",
"balance_float": 1000.0,
"modify_tx": "0x01",
"component_id": "protocol_1"
}
}
},
"account_balances": {
"0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
"0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
"account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
"token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
"balance": "0x01f4",
"modify_tx": "0x01"
}
}
},
"component_tvl": {
"protocol_1": 1000.0
},
"dci_update": {
"new_entrypoints": {},
"new_entrypoint_params": {},
"trace_results": {}
}
}
}
"#.to_owned()
))
];
let (addr, server_thread) = mock_tycho_ws(&exp_comm, 1).await;
let client = WsDeltasClient::new_with_reconnects(
&format!("ws://{addr}"),
None,
3,
Duration::from_millis(110),
)
.unwrap();
let jh: JoinHandle<Result<(), DeltasError>> = client
.connect()
.await
.expect("connect failed");
for _ in 0..2 {
dbg!("loop");
let (_, mut rx) = timeout(
Duration::from_millis(200),
client.subscribe(
ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
SubscriptionOptions::new().with_compression(false),
),
)
.await
.expect("subscription timed out")
.expect("subscription failed");
let _ = timeout(Duration::from_millis(100), rx.recv())
.await
.expect("awaiting message timeout out")
.expect("receiving message failed");
let res = timeout(Duration::from_millis(200), rx.recv())
.await
.expect("awaiting closed connection timeout out");
assert!(res.is_none());
}
let res = jh.await.expect("ws client join failed");
assert!(res.is_err());
server_thread
.await
.expect("ws server loop errored");
}
async fn mock_bad_connection_tycho_ws(accept_first: bool) -> (SocketAddr, JoinHandle<()>) {
let server = TcpListener::bind("127.0.0.1:0")
.await
.expect("localhost bind failed");
let addr = server.local_addr().unwrap();
let jh = tokio::spawn(async move {
while let Ok((stream, _)) = server.accept().await {
if accept_first {
let stream = tokio_tungstenite::accept_async(stream)
.await
.unwrap();
sleep(Duration::from_millis(10)).await;
drop(stream)
} else {
drop(stream);
}
}
});
(addr, jh)
}
#[test_log::test(tokio::test)]
async fn test_subscribe_dead_client_after_max_attempts() {
let (addr, _) = mock_bad_connection_tycho_ws(true).await;
let client = WsDeltasClient::new_with_reconnects(
&format!("ws://{addr}"),
None,
3,
Duration::from_secs(0),
)
.unwrap();
let join_handle = client.connect().await.unwrap();
let handle_res = join_handle.await.unwrap();
assert!(handle_res.is_err());
assert!(!client.is_connected().await);
let subscription_res = timeout(
Duration::from_millis(10),
client.subscribe(
ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
SubscriptionOptions::new(),
),
)
.await
.unwrap();
assert!(subscription_res.is_err());
}
#[test_log::test(tokio::test)]
async fn test_ws_client_retry_cooldown() {
let start = std::time::Instant::now();
let (addr, _) = mock_bad_connection_tycho_ws(false).await;
let client = WsDeltasClient::new_with_reconnects(
&format!("ws://{addr}"),
None,
3, Duration::from_millis(50), )
.unwrap();
let connect_result = client.connect().await;
let elapsed = start.elapsed();
assert!(connect_result.is_err(), "Expected connection to fail after retries");
assert!(
elapsed >= Duration::from_millis(100),
"Expected at least 100ms elapsed, got {:?}",
elapsed
);
assert!(elapsed < Duration::from_millis(500), "Took too long: {:?}", elapsed);
}
#[test_log::test(tokio::test)]
async fn test_buffer_full_triggers_unsubscribe() {
let exp_comm = {
[
ExpectedComm::Receive(
100,
tungstenite::protocol::Message::Text(
subscribe(),
),
),
ExpectedComm::Send(tungstenite::protocol::Message::Text(
subscription_confirmation(),
)),
ExpectedComm::Send(tungstenite::protocol::Message::Text(
r#"
{
"subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
"deltas": {
"extractor": "vm:ambient",
"chain": "ethereum",
"block": {
"number": 123,
"hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
"parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
"chain": "ethereum",
"ts": "2023-09-14T00:00:00"
},
"finalized_block_height": 0,
"revert": false,
"new_tokens": {},
"account_updates": {},
"state_updates": {},
"new_protocol_components": {},
"deleted_protocol_components": {},
"component_balances": {},
"account_balances": {},
"component_tvl": {},
"dci_update": {
"new_entrypoints": {},
"new_entrypoint_params": {},
"trace_results": {}
}
}
}
"#.to_owned()
)),
ExpectedComm::Send(tungstenite::protocol::Message::Text(
r#"
{
"subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
"deltas": {
"extractor": "vm:ambient",
"chain": "ethereum",
"block": {
"number": 124,
"hash": "0x0000000000000000000000000000000000000000000000000000000000000001",
"parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
"chain": "ethereum",
"ts": "2023-09-14T00:00:01"
},
"finalized_block_height": 0,
"revert": false,
"new_tokens": {},
"account_updates": {},
"state_updates": {},
"new_protocol_components": {},
"deleted_protocol_components": {},
"component_balances": {},
"account_balances": {},
"component_tvl": {},
"dci_update": {
"new_entrypoints": {},
"new_entrypoint_params": {},
"trace_results": {}
}
}
}
"#.to_owned()
)),
ExpectedComm::Receive(
100,
tungstenite::protocol::Message::Text(
unsubscribe(),
),
),
ExpectedComm::Send(tungstenite::protocol::Message::Text(
subscription_ended(),
)),
]
};
let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
let client = WsDeltasClient::new_with_custom_buffers(
&format!("ws://{addr}"),
None,
128, 1, )
.unwrap();
let jh = client
.connect()
.await
.expect("connect failed");
let (_sub_id, mut rx) = timeout(
Duration::from_millis(100),
client.subscribe(
ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
SubscriptionOptions::new().with_compression(false),
),
)
.await
.expect("subscription timed out")
.expect("subscription failed");
tokio::time::sleep(Duration::from_millis(100)).await;
let mut received_msgs = Vec::new();
while received_msgs.len() < 3 {
match timeout(Duration::from_millis(200), rx.recv()).await {
Ok(Some(msg)) => {
received_msgs.push(msg);
}
Ok(None) => {
break;
}
Err(_) => {
break;
}
}
}
assert!(
received_msgs.len() <= 1,
"Expected buffer overflow to limit messages to at most 1, got {}",
received_msgs.len()
);
if let Some(first_msg) = received_msgs.first() {
assert_eq!(first_msg.block.number, 123, "Expected first message with block 123");
}
drop(rx); tokio::time::sleep(Duration::from_millis(50)).await;
jh.abort();
server_thread.abort();
let _ = jh.await;
let _ = server_thread.await;
}
#[tokio::test]
async fn test_server_error_handling() {
use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
let error_response = WebSocketMessage::Response(Response::Error(
WebsocketError::ExtractorNotFound(extractor_id.clone()),
));
let error_json = serde_json::to_string(&error_response).unwrap();
let exp_comm = [
ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
];
let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
let jh = client
.connect()
.await
.expect("connect failed");
let result = timeout(
Duration::from_millis(100),
client.subscribe(extractor_id, SubscriptionOptions::new().with_compression(false)),
)
.await
.expect("subscription timed out");
assert!(result.is_err());
if let Err(DeltasError::ServerError(msg, _)) = result {
assert!(msg.contains("Subscription failed"));
assert!(msg.contains("Extractor not found"));
} else {
panic!("Expected DeltasError::ServerError, got: {:?}", result);
}
timeout(Duration::from_millis(100), client.close())
.await
.expect("close timed out")
.expect("close failed");
jh.await
.expect("ws loop errored")
.unwrap();
server_thread.await.unwrap();
}
#[test_log::test(tokio::test)]
async fn test_subscription_not_found_error() {
use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
let subscription_id = Uuid::from_str(SUBSCRIPTION_ID).unwrap();
let error_response = WebSocketMessage::Response(Response::Error(
WebsocketError::SubscriptionNotFound(subscription_id),
));
let error_json = serde_json::to_string(&error_response).unwrap();
let exp_comm = [
ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(unsubscribe())),
ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
];
let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
let jh = client
.connect()
.await
.expect("connect failed");
let (received_sub_id, _rx) = timeout(
Duration::from_millis(100),
client.subscribe(extractor_id, SubscriptionOptions::new().with_compression(false)),
)
.await
.expect("subscription timed out")
.expect("subscription failed");
assert_eq!(received_sub_id, subscription_id);
let unsubscribe_result =
timeout(Duration::from_millis(100), client.unsubscribe(subscription_id))
.await
.expect("unsubscribe timed out");
unsubscribe_result
.expect("Unsubscribe should succeed even if server says subscription not found");
timeout(Duration::from_millis(100), client.close())
.await
.expect("close timed out")
.expect("close failed");
jh.await
.expect("ws loop errored")
.unwrap();
server_thread.await.unwrap();
}
#[test_log::test(tokio::test)]
async fn test_parse_error_handling() {
use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
let error_response = WebSocketMessage::Response(Response::Error(
WebsocketError::ParseError("}2sdf".to_string(), "malformed JSON".to_string()),
));
let error_json = serde_json::to_string(&error_response).unwrap();
let exp_comm = [
ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
];
let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
let jh = client
.connect()
.await
.expect("connect failed");
let _ = timeout(
Duration::from_millis(100),
client.subscribe(extractor_id, SubscriptionOptions::new().with_compression(false)),
)
.await
.expect("subscription timed out");
let result = jh
.await
.expect("ws loop should complete");
assert!(result.is_err());
if let Err(DeltasError::ServerError(message, _)) = result {
assert!(message.contains("Server failed to parse client message"));
} else {
panic!("Expected DeltasError::ServerError, got: {:?}", result);
}
server_thread.await.unwrap();
}
#[test_log::test(tokio::test)]
async fn test_compression_error_handling() {
use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
let subscription_id = Uuid::from_str(SUBSCRIPTION_ID).unwrap();
let error_response = WebSocketMessage::Response(Response::Error(
WebsocketError::CompressionError(subscription_id, "Compression failed".to_string()),
));
let error_json = serde_json::to_string(&error_response).unwrap();
let exp_comm = [
ExpectedComm::Receive(
100,
tungstenite::protocol::Message::Text(subscribe_with_compression(true)),
),
ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
];
let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
let jh = client
.connect()
.await
.expect("connect failed");
let _ = timeout(
Duration::from_millis(100),
client.subscribe(extractor_id, SubscriptionOptions::new()),
)
.await
.expect("subscription timed out");
let result = jh
.await
.expect("ws loop should complete");
assert!(result.is_err());
if let Err(DeltasError::ServerError(message, _)) = result {
assert!(message.contains("Server failed to compress message for subscription"));
} else {
panic!("Expected DeltasError::ServerError, got: {:?}", result);
}
server_thread.await.unwrap();
}
#[tokio::test]
async fn test_subscribe_error_handling() {
use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
let error_response = WebSocketMessage::Response(Response::Error(
WebsocketError::SubscribeError(extractor_id.clone()),
));
let error_json = serde_json::to_string(&error_response).unwrap();
let exp_comm = [
ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
];
let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
let jh = client
.connect()
.await
.expect("connect failed");
let result = timeout(
Duration::from_millis(100),
client.subscribe(extractor_id, SubscriptionOptions::new().with_compression(false)),
)
.await
.expect("subscription timed out");
assert!(result.is_err());
if let Err(DeltasError::ServerError(msg, _)) = result {
assert!(msg.contains("Subscription failed"));
assert!(msg.contains("Failed to subscribe to extractor"));
} else {
panic!("Expected DeltasError::ServerError, got: {:?}", result);
}
timeout(Duration::from_millis(100), client.close())
.await
.expect("close timed out")
.expect("close failed");
jh.await
.expect("ws loop errored")
.unwrap();
server_thread.await.unwrap();
}
#[tokio::test]
async fn test_cancel_pending_subscription() {
use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
let error_response = WebSocketMessage::Response(Response::Error(
WebsocketError::ExtractorNotFound(extractor_id.clone()),
));
let error_json = serde_json::to_string(&error_response).unwrap();
let exp_comm = [
ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
];
let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
let jh = client
.connect()
.await
.expect("connect failed");
let client_clone = client.clone();
let extractor_id_clone = extractor_id.clone();
let subscription1 = tokio::spawn({
let client_for_spawn = client.clone();
async move {
client_for_spawn
.subscribe(extractor_id, SubscriptionOptions::new().with_compression(false))
.await
}
});
let subscription2 = tokio::spawn(async move {
client_clone
.subscribe(extractor_id_clone, SubscriptionOptions::new())
.await
});
let (result1, result2) = tokio::join!(subscription1, subscription2);
let result1 = result1.unwrap();
let result2 = result2.unwrap();
assert!(result1.is_err() || result2.is_err());
if let Err(DeltasError::SubscriptionAlreadyPending) = result2 {
} else if let Err(DeltasError::ServerError(_, _)) = result1 {
} else {
panic!("Expected one SubscriptionAlreadyPending and one ServerError");
}
timeout(Duration::from_millis(100), client.close())
.await
.expect("close timed out")
.expect("close failed");
jh.await
.expect("ws loop errored")
.unwrap();
server_thread.await.unwrap();
}
#[tokio::test]
async fn test_force_unsubscribe_prevents_multiple_calls() {
let subscription_id = Uuid::from_str(SUBSCRIPTION_ID).unwrap();
let exp_comm = [
ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(unsubscribe())),
ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_ended())),
];
let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
let jh = client
.connect()
.await
.expect("connect failed");
let (received_sub_id, _rx) = timeout(
Duration::from_millis(100),
client.subscribe(
ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
SubscriptionOptions::new().with_compression(false),
),
)
.await
.expect("subscription timed out")
.expect("subscription failed");
assert_eq!(received_sub_id, subscription_id);
{
let mut inner_guard = client.inner.lock().await;
let inner = inner_guard
.as_mut()
.expect("client should be connected");
WsDeltasClient::force_unsubscribe(subscription_id, inner).await;
WsDeltasClient::force_unsubscribe(subscription_id, inner).await;
}
tokio::time::sleep(Duration::from_millis(50)).await;
let _ = timeout(Duration::from_millis(100), client.close()).await;
let _ = jh.await;
let _ = server_thread.await;
}
}