use digdigdig3::core::rt::WsFrame;
use digdigdig3::core::websocket::reconnect::ReconnectConfig;
use digdigdig3::core::websocket::transport::UniversalWsTransport;
use digdigdig3::core::websocket::protocol::WsProtocol;
use digdigdig3::core::websocket::stream_spec::StreamSpec;
use digdigdig3::core::websocket::topic_registry::TopicRegistry;
use digdigdig3::core::types::{AccountType, ConnectionStatus, WebSocketError};
use std::time::Duration;
use tokio::net::TcpListener;
use tokio_tungstenite::accept_async;
use url::Url;
struct SilentProtocol {
url: Url,
registry: TopicRegistry,
}
impl SilentProtocol {
fn new(url: Url) -> Self {
Self {
url,
registry: TopicRegistry::builder().build(),
}
}
}
impl WsProtocol for SilentProtocol {
fn name(&self) -> &'static str {
"silent_mock"
}
fn endpoint(&self, _: AccountType, _: bool) -> Url {
self.url.clone()
}
fn ping_frame(&self) -> Option<WsFrame> {
None
}
fn ping_interval(&self) -> Duration {
Duration::from_millis(100)
}
fn subscribe_frame(
&self,
_: &StreamSpec,
) -> Result<WsFrame, WebSocketError> {
Ok(WsFrame::Text("{}".to_string()))
}
fn unsubscribe_frame(
&self,
_: &StreamSpec,
) -> Result<WsFrame, WebSocketError> {
Ok(WsFrame::Text("{}".to_string()))
}
fn auth_frame(
&self,
_: &digdigdig3::core::traits::Credentials,
) -> Option<Result<WsFrame, WebSocketError>> {
None
}
fn extract_topic(&self, _: &serde_json::Value) -> Option<digdigdig3::core::websocket::topic_registry::TopicKey> {
None
}
fn topic_registry(&self, _: AccountType) -> &TopicRegistry {
&self.registry
}
}
async fn spawn_silent_ws_server() -> std::net::SocketAddr {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
loop {
if let Ok((tcp, _)) = listener.accept().await {
tokio::spawn(async move {
if let Ok(_ws) = accept_async(tcp).await {
tokio::time::sleep(Duration::from_secs(60)).await;
}
});
}
}
});
addr
}
#[tokio::test]
async fn silent_watchdog_fires_and_reconnects() {
let addr = spawn_silent_ws_server().await;
let url = Url::parse(&format!("ws://{addr}")).unwrap();
let cfg = ReconnectConfig {
initial_delay_ms: 50,
max_delay_ms: 50,
silent_multiplier: 2,
connection_timeout_ms: 2_000,
..ReconnectConfig::default()
};
let protocol = SilentProtocol::new(url);
let transport =
UniversalWsTransport::with_reconnect(protocol, AccountType::Spot, false, None, cfg);
let deadline = tokio::time::Instant::now() + Duration::from_secs(3);
loop {
if transport.connection_status() == ConnectionStatus::Connected {
break;
}
assert!(
tokio::time::Instant::now() < deadline,
"transport never connected to mock server"
);
tokio::time::sleep(Duration::from_millis(10)).await;
}
tokio::time::sleep(Duration::from_millis(600)).await;
let status = transport.connection_status();
assert_ne!(
status,
ConnectionStatus::Disconnected,
"transport should be reconnecting or connected after watchdog fired, got {status:?}"
);
}
#[test]
fn silent_multiplier_default_is_2() {
let cfg = ReconnectConfig::default();
assert_eq!(cfg.silent_multiplier, 2);
}
#[test]
fn silent_multiplier_custom() {
let cfg = ReconnectConfig {
silent_multiplier: 5,
..ReconnectConfig::default()
};
assert_eq!(cfg.silent_multiplier, 5);
}
#[tokio::test]
#[ignore]
async fn silent_watchdog_does_not_false_positive_on_active_stream() {
}