use std::time::Duration;
use zero_engine_client::{EngineEvent, EngineState, JitterMode, ReconnectConfig, WsSubscriber};
use zero_testkit::mock_engine::MockEngine;
async fn wait_for<F>(timeout: Duration, mut predicate: F) -> bool
where
F: FnMut() -> bool,
{
let deadline = tokio::time::Instant::now() + timeout;
while tokio::time::Instant::now() < deadline {
if predicate() {
return true;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
predicate()
}
#[tokio::test]
async fn connects_and_marks_state() {
let mock = MockEngine::spawn().await.expect("spawn mock");
let state = EngineState::shared();
let sub = WsSubscriber::spawn(&mock.ws_url(), None, state.clone()).expect("subscribe");
let connected = wait_for(Duration::from_secs(2), || {
state.read().connection.ws_connected
})
.await;
assert!(connected, "subscriber did not mark ws_connected within 2s");
sub.shutdown().await.expect("shutdown");
mock.shutdown().await;
}
#[tokio::test]
async fn applies_positions_and_risk_from_ws() {
let mock = MockEngine::spawn().await.expect("spawn mock");
let state = EngineState::shared();
let sub = WsSubscriber::spawn(&mock.ws_url(), None, state.clone()).expect("subscribe");
let got_positions = wait_for(Duration::from_secs(2), || state.read().positions.is_some()).await;
assert!(got_positions, "positions were not applied to EngineState");
let got_risk = wait_for(Duration::from_secs(2), || state.read().risk.is_some()).await;
assert!(got_risk, "risk was not applied to EngineState");
{
let s = state.read();
let positions = s.positions.as_ref().unwrap();
assert_eq!(positions.value.items.len(), 1);
assert_eq!(positions.value.items[0].symbol, "BTC");
let risk = s.risk.as_ref().unwrap();
assert!(!risk.value.is_halted());
}
sub.shutdown().await.expect("shutdown");
mock.shutdown().await;
}
#[tokio::test]
async fn broadcast_receives_typed_events() {
let mock = MockEngine::spawn().await.expect("spawn mock");
let state = EngineState::shared();
let sub = WsSubscriber::spawn(&mock.ws_url(), None, state.clone()).expect("subscribe");
let mut rx = sub.events();
let mut saw_heartbeat = false;
let mut saw_positions = false;
let mut saw_risk = false;
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
while tokio::time::Instant::now() < deadline && !(saw_heartbeat && saw_positions && saw_risk) {
match tokio::time::timeout(Duration::from_millis(200), rx.recv()).await {
Ok(Ok(EngineEvent::Heartbeat(_))) => saw_heartbeat = true,
Ok(Ok(EngineEvent::Positions(_))) => saw_positions = true,
Ok(Ok(EngineEvent::Risk(_))) => saw_risk = true,
Ok(Ok(_) | Err(_)) | Err(_) => {}
}
}
assert!(saw_heartbeat, "no heartbeat event");
assert!(saw_positions, "no positions event");
assert!(saw_risk, "no risk event");
sub.shutdown().await.expect("shutdown");
mock.shutdown().await;
}
#[tokio::test]
async fn reconnects_after_peer_drop() {
let mock = MockEngine::spawn().await.expect("spawn mock");
mock.with_overrides(|o| o.ws_drop_once = true);
let state = EngineState::shared();
let sub = WsSubscriber::spawn_with_config(
&mock.ws_url(),
None,
state.clone(),
ReconnectConfig {
initial_backoff: Duration::from_millis(20),
max_backoff: Duration::from_millis(200),
multiplier: 2,
jitter: JitterMode::None,
},
)
.expect("subscribe");
let reconnected = wait_for(Duration::from_secs(2), || {
state.read().connection.ws_connected && state.read().positions.is_some()
})
.await;
assert!(reconnected, "subscriber did not recover after peer drop");
assert!(
state.read().connection.total_attempts >= 2,
"total_attempts should be >= 2 (first drop + successful reconnect); got {}",
state.read().connection.total_attempts
);
assert_eq!(
state.read().connection.reconnect_count,
0,
"reconnect_count must reset to 0 on success"
);
sub.shutdown().await.expect("shutdown");
mock.shutdown().await;
}
#[tokio::test]
async fn shutdown_is_clean_when_peer_is_down() {
let state = EngineState::shared();
let sub = WsSubscriber::spawn_with_config(
"ws://127.0.0.1:1/ws",
None,
state,
ReconnectConfig {
initial_backoff: Duration::from_millis(20),
max_backoff: Duration::from_millis(100),
multiplier: 2,
jitter: JitterMode::None,
},
)
.expect("subscribe");
tokio::time::sleep(Duration::from_millis(80)).await;
let result = tokio::time::timeout(Duration::from_secs(2), sub.shutdown()).await;
assert!(result.is_ok(), "shutdown did not return within 2s");
result.unwrap().expect("shutdown ok");
}