use crate::connection::WebSocketConnection;
use crate::error::{DXLinkError, DXLinkResult};
use crate::events::{CompactData, EventType, MarketEvent};
use crate::messages::{
AuthMessage, AuthStateMessage, BaseMessage, ChannelRequestMessage, ErrorMessage,
FeedDataMessage, FeedSetupMessage, FeedSubscription, FeedSubscriptionMessage, KeepaliveMessage,
SetupMessage,
};
use crate::parse_compact_data;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tracing::{debug, error, info, warn};
const DEFAULT_KEEPALIVE_TIMEOUT: u32 = 60;
const DEFAULT_KEEPALIVE_INTERVAL: u32 = 15;
const DEFAULT_CLIENT_VERSION: &str = "1.0.2-dxlink-0.1.3";
const MAIN_CHANNEL: u32 = 0;
pub type EventCallback = Box<dyn Fn(MarketEvent) + Send + Sync + 'static>;
#[derive(Debug)]
enum ResponseType {
ChannelOpened(u32),
FeedConfig(u32),
ChannelClosed(u32),
Error(String),
#[allow(dead_code)]
Other(String),
}
#[derive(Debug)]
struct ResponseRequest {
expected_type: String,
channel_id: Option<u32>,
response_sender: oneshot::Sender<ResponseType>,
}
pub struct DXLinkClient {
url: String,
token: String,
connection: Option<WebSocketConnection>,
keepalive_timeout: u32,
next_channel_id: Arc<Mutex<u32>>,
channels: Arc<Mutex<HashMap<u32, String>>>, callbacks: Arc<Mutex<HashMap<String, EventCallback>>>, subscriptions: Arc<Mutex<HashSet<(EventType, String)>>>, event_sender: Option<Sender<MarketEvent>>,
keepalive_handle: Option<JoinHandle<()>>,
message_handle: Option<JoinHandle<()>>,
keepalive_sender: Option<Sender<()>>,
response_requests: Arc<Mutex<Vec<ResponseRequest>>>,
}
impl DXLinkClient {
pub fn new(url: &str, token: &str) -> Self {
Self {
url: url.to_string(),
token: token.to_string(),
connection: None,
keepalive_timeout: DEFAULT_KEEPALIVE_TIMEOUT,
next_channel_id: Arc::new(Mutex::new(1)), channels: Arc::new(Mutex::new(HashMap::new())),
callbacks: Arc::new(Mutex::new(HashMap::new())),
subscriptions: Arc::new(Mutex::new(HashSet::new())),
event_sender: None,
keepalive_handle: None,
message_handle: None,
keepalive_sender: None,
response_requests: Arc::new(Mutex::new(Vec::new())),
}
}
pub async fn connect(&mut self) -> DXLinkResult<Receiver<MarketEvent>> {
let connection = WebSocketConnection::connect(&self.url).await?;
let setup_msg = SetupMessage {
channel: MAIN_CHANNEL,
message_type: "SETUP".to_string(),
keepalive_timeout: self.keepalive_timeout,
accept_keepalive_timeout: self.keepalive_timeout,
version: DEFAULT_CLIENT_VERSION.to_string(),
};
connection.send(&setup_msg).await?;
let response = connection.receive().await?;
let _: SetupMessage = serde_json::from_str(&response)?;
let response = connection.receive().await?;
let auth_state: AuthStateMessage = serde_json::from_str(&response)?;
if auth_state.state == "AUTHORIZED" {
info!("Already authorized to DXLink server");
} else if auth_state.state == "UNAUTHORIZED" {
let auth_msg = AuthMessage {
channel: MAIN_CHANNEL,
message_type: "AUTH".to_string(),
token: self.token.clone(),
};
connection.send(&auth_msg).await?;
let response = connection.receive().await?;
let auth_state: AuthStateMessage = serde_json::from_str(&response)?;
if auth_state.state != "AUTHORIZED" {
return Err(DXLinkError::Authentication(format!(
"Authentication failed. State: {}",
auth_state.state
)));
}
info!("Successfully authenticated to DXLink server");
} else {
return Err(DXLinkError::Protocol(format!(
"Unexpected authentication state: {}",
auth_state.state
)));
}
info!("Successfully connected to DXLink server");
self.connection = Some(connection);
let receiver = self.event_stream();
self.start_message_processing()?;
self.start_keepalive()?;
receiver
}
#[allow(dead_code)]
async fn wait_for_response(
&self,
expected_type: &str,
channel_id: Option<u32>,
timeout: Duration,
) -> DXLinkResult<ResponseType> {
let (tx, rx) = oneshot::channel();
{
let mut requests = self.response_requests.lock().unwrap();
requests.push(ResponseRequest {
expected_type: expected_type.to_string(),
channel_id,
response_sender: tx,
});
}
match tokio::time::timeout(timeout, rx).await {
Ok(Ok(response)) => Ok(response),
Ok(Err(_)) => Err(DXLinkError::Protocol("Response channel closed".to_string())),
Err(_) => Err(DXLinkError::Timeout(format!(
"Timed out waiting for {} message{}",
expected_type,
channel_id.map_or("".to_string(), |id| format!(" for channel {}", id))
))),
}
}
fn start_keepalive(&mut self) -> DXLinkResult<()> {
if self.connection.is_none() {
return Err(DXLinkError::Connection(
"Cannot start keepalive without a connection".to_string(),
));
}
let (tx, mut rx) = mpsc::channel::<()>(1);
self.keepalive_sender = Some(tx);
let connection = self.connection.as_ref().unwrap().clone();
let keepalive_interval = Duration::from_secs(DEFAULT_KEEPALIVE_INTERVAL as u64);
let keepalive_handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(keepalive_interval);
loop {
tokio::select! {
_ = interval.tick() => {
let keepalive_msg = KeepaliveMessage {
channel: MAIN_CHANNEL,
message_type: "KEEPALIVE".to_string(),
};
match connection.send(&keepalive_msg).await {
Ok(_) => {
debug!("Sent keepalive message");
},
Err(e) => {
error!("Failed to send keepalive: {}", e);
break;
}
}
}
_ = rx.recv() => {
debug!("Keepalive task received shutdown signal");
break;
}
}
}
debug!("Keepalive task terminated");
});
self.keepalive_handle = Some(keepalive_handle);
Ok(())
}
fn start_message_processing(&mut self) -> DXLinkResult<()> {
if self.connection.is_none() {
return Err(DXLinkError::Connection(
"Cannot start message processing without a connection".to_string(),
));
}
let connection = self.connection.as_ref().unwrap().clone();
let callbacks = self.callbacks.clone();
let event_sender = self.event_sender.clone();
let response_requests = self.response_requests.clone();
let message_handle = tokio::spawn(async move {
loop {
match connection.receive().await {
Ok(msg) => {
debug!("Received message: {}", msg);
if let Ok(value) = serde_json::from_str::<serde_json::Value>(&msg) {
let msg_type = value.get("type").and_then(|v| v.as_str()).unwrap_or("");
let channel = value
.get("channel")
.and_then(|v| v.as_u64())
.map(|c| c as u32);
{
let mut requests = response_requests.lock().unwrap();
if let Some(idx) = requests.iter().position(|req| {
req.expected_type == msg_type
&& (req.channel_id.is_none() || req.channel_id == channel)
}) {
let request = requests.remove(idx);
let response = match msg_type {
"CHANNEL_OPENED" => {
ResponseType::ChannelOpened(channel.unwrap_or(0))
}
"FEED_CONFIG" => {
ResponseType::FeedConfig(channel.unwrap_or(0))
}
"CHANNEL_CLOSED" => {
ResponseType::ChannelClosed(channel.unwrap_or(0))
}
"ERROR" => {
let error = value
.get("error")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let message = value
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("");
ResponseType::Error(format!("{} - {}", error, message))
}
_ => ResponseType::Other(msg.clone()),
};
let _ = request.response_sender.send(response);
continue; }
}
match msg_type {
"FEED_DATA" => {
if let Ok(data_msg) = serde_json::from_str::<
FeedDataMessage<Vec<CompactData>>,
>(&msg)
{
let events = parse_compact_data(&data_msg.data);
for event in events {
let symbol = match &event {
MarketEvent::Quote(e) => &e.event_symbol,
MarketEvent::Trade(e) => &e.event_symbol,
MarketEvent::Greeks(e) => &e.event_symbol,
};
if let Ok(callbacks) = callbacks.lock()
&& let Some(callback) = callbacks.get(symbol)
{
callback(event.clone());
}
if let Some(tx) = &event_sender
&& let Err(e) = tx.send(event.clone()).await
{
error!("Failed to send event to channel: {}", e);
}
}
}
}
"ERROR" => {
if let Ok(error_msg) =
serde_json::from_str::<ErrorMessage>(&msg)
{
error!(
"Received error from server: {} - {}",
error_msg.error, error_msg.message
);
}
}
"KEEPALIVE" => {
debug!("Received KEEPALIVE message");
}
_ => {
debug!("Received unhandled message type: {}", msg_type);
}
}
}
}
Err(e) => {
error!("Error receiving message: {}", e);
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
});
self.message_handle = Some(message_handle);
Ok(())
}
pub async fn disconnect(&mut self) -> DXLinkResult<()> {
if let Some(sender) = &self.keepalive_sender {
let _ = sender.send(()).await;
self.keepalive_sender = None;
}
if let Some(handle) = self.keepalive_handle.take() {
handle.abort();
}
if let Some(handle) = self.message_handle.take() {
handle.abort();
}
let channels_to_close = {
let channels = self.channels.lock().unwrap();
channels.keys().cloned().collect::<Vec<_>>()
};
for channel_id in channels_to_close {
if let Err(e) = self.close_channel(channel_id).await {
warn!("Error closing channel {}: {}", channel_id, e);
}
}
self.connection = None;
info!("Disconnected from DXLink server");
Ok(())
}
pub async fn create_feed_channel(&mut self, contract: &str) -> DXLinkResult<u32> {
let channel_id = self.next_channel_id()?;
let mut params = HashMap::new();
params.insert("contract".to_string(), contract.to_string());
let channel_request = ChannelRequestMessage {
channel: channel_id,
message_type: "CHANNEL_REQUEST".to_string(),
service: "FEED".to_string(),
parameters: params,
};
let (tx, rx) = oneshot::channel();
{
let mut requests = self.response_requests.lock().unwrap();
requests.push(ResponseRequest {
expected_type: "CHANNEL_OPENED".to_string(),
channel_id: Some(channel_id),
response_sender: tx,
});
}
let conn = self.get_connection_mut()?;
conn.send(&channel_request).await?;
let response = match tokio::time::timeout(Duration::from_secs(10), rx).await {
Ok(Ok(response)) => response,
Ok(Err(_)) => return Err(DXLinkError::Protocol("Response channel closed".to_string())),
Err(_) => {
return Err(DXLinkError::Timeout(format!(
"Timed out waiting for CHANNEL_OPENED message for channel {}",
channel_id
)));
}
};
match response {
ResponseType::ChannelOpened(received_channel) => {
if received_channel != channel_id {
return Err(DXLinkError::Channel(format!(
"Expected channel ID {}, got {}",
channel_id, received_channel
)));
}
{
let mut channels = self.channels.lock().unwrap();
channels.insert(channel_id, "FEED".to_string());
}
info!("Feed channel {} created successfully", channel_id);
Ok(channel_id)
}
ResponseType::Error(error) => Err(DXLinkError::Protocol(format!(
"Server returned error: {}",
error
))),
_ => Err(DXLinkError::Protocol(
"Unexpected response type".to_string(),
)),
}
}
pub async fn setup_feed(
&mut self,
channel_id: u32,
event_types: &[EventType],
) -> DXLinkResult<()> {
self.validate_channel(channel_id, "FEED")?;
let mut accept_event_fields = HashMap::new();
for event_type in event_types {
let fields = match event_type {
EventType::Quote => vec![
"eventType".to_string(),
"eventSymbol".to_string(),
"bidPrice".to_string(),
"askPrice".to_string(),
"bidSize".to_string(),
"askSize".to_string(),
],
EventType::Trade => vec![
"eventType".to_string(),
"eventSymbol".to_string(),
"price".to_string(),
"size".to_string(),
"dayVolume".to_string(),
],
EventType::Greeks => vec![
"eventType".to_string(),
"eventSymbol".to_string(),
"delta".to_string(),
"gamma".to_string(),
"theta".to_string(),
"vega".to_string(),
"rho".to_string(),
"volatility".to_string(),
],
_ => vec!["eventType".to_string(), "eventSymbol".to_string()],
};
accept_event_fields.insert(event_type.to_string(), fields);
}
let feed_setup = FeedSetupMessage {
channel: channel_id,
message_type: "FEED_SETUP".to_string(),
accept_aggregation_period: 0.1,
accept_data_format: "COMPACT".to_string(),
accept_event_fields,
};
let json = serde_json::to_string(&feed_setup)?;
debug!("Sending FEED_SETUP: {}", json);
let (tx, rx) = oneshot::channel();
{
let mut requests = self.response_requests.lock().unwrap();
requests.push(ResponseRequest {
expected_type: "FEED_CONFIG".to_string(),
channel_id: Some(channel_id),
response_sender: tx,
});
}
let conn = self.get_connection_mut()?;
conn.send(&feed_setup).await?;
let response = match tokio::time::timeout(Duration::from_secs(10), rx).await {
Ok(Ok(response)) => response,
Ok(Err(_)) => return Err(DXLinkError::Protocol("Response channel closed".to_string())),
Err(_) => {
return Err(DXLinkError::Timeout(format!(
"Timed out waiting for FEED_CONFIG message for channel {}",
channel_id
)));
}
};
match response {
ResponseType::FeedConfig(received_channel) => {
if received_channel != channel_id {
return Err(DXLinkError::Channel(format!(
"Expected config for channel {}, got {}",
channel_id, received_channel
)));
}
info!("Feed channel {} setup completed successfully", channel_id);
Ok(())
}
ResponseType::Error(error) => Err(DXLinkError::Protocol(format!(
"Server returned error: {}",
error
))),
_ => Err(DXLinkError::Protocol(
"Unexpected response type".to_string(),
)),
}
}
pub async fn subscribe(
&mut self,
channel_id: u32,
subscriptions: Vec<FeedSubscription>,
) -> DXLinkResult<()> {
self.validate_channel(channel_id, "FEED")?;
{
let mut subs = self.subscriptions.lock().unwrap();
for sub in &subscriptions {
subs.insert((EventType::from(sub.event_type.as_str()), sub.symbol.clone()));
}
}
let subscription_msg = FeedSubscriptionMessage {
channel: channel_id,
message_type: "FEED_SUBSCRIPTION".to_string(),
add: Some(subscriptions),
remove: None,
reset: None,
};
let conn = self.get_connection_mut()?;
conn.send(&subscription_msg).await?;
info!("Subscriptions added to channel {}", channel_id);
Ok(())
}
pub async fn unsubscribe(
&mut self,
channel_id: u32,
subscriptions: Vec<FeedSubscription>,
) -> DXLinkResult<()> {
self.validate_channel(channel_id, "FEED")?;
{
let mut subs = self.subscriptions.lock().unwrap();
for sub in &subscriptions {
subs.remove(&(EventType::from(sub.event_type.as_str()), sub.symbol.clone()));
}
}
let subscription_msg = FeedSubscriptionMessage {
channel: channel_id,
message_type: "FEED_SUBSCRIPTION".to_string(),
add: None,
remove: Some(subscriptions),
reset: None,
};
let conn = self.get_connection_mut()?;
conn.send(&subscription_msg).await?;
info!("Subscriptions removed from channel {}", channel_id);
Ok(())
}
pub async fn reset_subscriptions(&mut self, channel_id: u32) -> DXLinkResult<()> {
self.validate_channel(channel_id, "FEED")?;
{
let mut subs = self.subscriptions.lock().unwrap();
subs.clear(); }
let subscription_msg = FeedSubscriptionMessage {
channel: channel_id,
message_type: "FEED_SUBSCRIPTION".to_string(),
add: None,
remove: None,
reset: Some(true),
};
let conn = self.get_connection_mut()?;
conn.send(&subscription_msg).await?;
info!("All subscriptions reset on channel {}", channel_id);
Ok(())
}
pub async fn close_channel(&mut self, channel_id: u32) -> DXLinkResult<()> {
{
let channels = self.channels.lock().unwrap();
if !channels.contains_key(&channel_id) {
return Err(DXLinkError::Channel(format!(
"Channel {} not found",
channel_id
)));
}
}
let cancel_msg = BaseMessage {
channel: channel_id,
message_type: "CHANNEL_CANCEL".to_string(),
};
let (tx, rx) = oneshot::channel();
{
let mut requests = self.response_requests.lock().unwrap();
requests.push(ResponseRequest {
expected_type: "CHANNEL_CLOSED".to_string(),
channel_id: Some(channel_id),
response_sender: tx,
});
}
let conn = self.get_connection_mut()?;
conn.send(&cancel_msg).await?;
let response = match tokio::time::timeout(Duration::from_secs(5), rx).await {
Ok(Ok(response)) => response,
Ok(Err(_)) => return Err(DXLinkError::Protocol("Response channel closed".to_string())),
Err(_) => {
return Err(DXLinkError::Timeout(format!(
"Timed out waiting for CHANNEL_CLOSED message for channel {}",
channel_id
)));
}
};
match response {
ResponseType::ChannelClosed(received_channel) => {
if received_channel != channel_id {
return Err(DXLinkError::Channel(format!(
"Expected CHANNEL_CLOSED for channel {}, got {}",
channel_id, received_channel
)));
}
{
let mut channels = self.channels.lock().unwrap();
channels.remove(&channel_id);
}
info!("Channel {} closed successfully", channel_id);
Ok(())
}
ResponseType::Error(error) => Err(DXLinkError::Protocol(format!(
"Server returned error: {}",
error
))),
_ => Err(DXLinkError::Protocol(
"Unexpected response type".to_string(),
)),
}
}
pub fn on_event(&self, symbol: &str, callback: impl Fn(MarketEvent) + Send + Sync + 'static) {
let mut callbacks = self.callbacks.lock().unwrap();
callbacks.insert(symbol.to_string(), Box::new(callback));
}
pub fn event_stream(&mut self) -> DXLinkResult<Receiver<MarketEvent>> {
if self.event_sender.is_none() {
let (tx, rx) = mpsc::channel(100); self.event_sender = Some(tx);
Ok(rx)
} else {
Err(DXLinkError::Protocol(
"Event stream already created".to_string(),
))
}
}
fn next_channel_id(&self) -> DXLinkResult<u32> {
let mut id = self.next_channel_id.lock().unwrap();
let channel_id = *id;
*id += 1;
Ok(channel_id)
}
fn get_connection_mut(&mut self) -> DXLinkResult<&mut WebSocketConnection> {
self.connection
.as_mut()
.ok_or_else(|| DXLinkError::Connection("Not connected to DXLink server".to_string()))
}
fn validate_channel(&self, channel_id: u32, expected_service: &str) -> DXLinkResult<()> {
let channels = self.channels.lock().unwrap();
match channels.get(&channel_id) {
Some(service) if service == expected_service => Ok(()),
Some(service) => Err(DXLinkError::Channel(format!(
"Channel {} is a {} channel, not a {} channel",
channel_id, service, expected_service
))),
None => Err(DXLinkError::Channel(format!(
"Channel {} not found",
channel_id
))),
}
}
}
impl fmt::Debug for DXLinkClient {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut debug_struct = f.debug_struct("DXLinkClient");
debug_struct.field("url", &self.url);
debug_struct.field("has_token", &(!self.token.is_empty()));
debug_struct.field("connected", &self.connection.is_some());
debug_struct.field("keepalive_timeout", &self.keepalive_timeout);
let channel_count = if let Ok(channels) = self.channels.lock() {
channels.len()
} else {
0
};
debug_struct.field("channel_count", &channel_count);
let callback_count = if let Ok(callbacks) = self.callbacks.lock() {
callbacks.len()
} else {
0
};
debug_struct.field("callback_count", &callback_count);
let subscription_count = if let Ok(subscriptions) = self.subscriptions.lock() {
subscriptions.len()
} else {
0
};
debug_struct.field("subscription_count", &subscription_count);
debug_struct.field("has_event_sender", &self.event_sender.is_some());
debug_struct.field("keepalive_active", &self.keepalive_handle.is_some());
debug_struct.field("message_handler_active", &self.message_handle.is_some());
let pending_responses = if let Ok(requests) = self.response_requests.lock() {
requests.len()
} else {
0
};
debug_struct.field("pending_responses", &pending_responses);
debug_struct.finish()
}
}
impl fmt::Display for DXLinkClient {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"DXLink Client [{}]",
if self.connection.is_some() {
"Connected"
} else {
"Disconnected"
}
)?;
write!(f, " to {}", self.url)?;
let channel_count = self.channels.lock().map(|c| c.len()).unwrap_or(0);
let subscription_count = self.subscriptions.lock().map(|s| s.len()).unwrap_or(0);
write!(
f,
" | Channels: {}, Subscriptions: {}",
channel_count, subscription_count
)?;
let tasks_status = match (
self.message_handle.is_some(),
self.keepalive_handle.is_some(),
) {
(true, true) => "All tasks running",
(true, false) => "Message handler only",
(false, true) => "Keepalive only",
(false, false) => "No tasks running",
};
write!(f, " | {}", tasks_status)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::events::QuoteEvent;
#[test]
fn test_new_client() {
let client = DXLinkClient::new("wss://test.url", "test_token");
assert_eq!(client.url, "wss://test.url");
assert_eq!(client.token, "test_token");
assert_eq!(client.keepalive_timeout, DEFAULT_KEEPALIVE_TIMEOUT);
assert!(client.connection.is_none());
assert!(client.event_sender.is_none());
assert!(client.keepalive_handle.is_none());
assert!(client.message_handle.is_none());
assert!(client.keepalive_sender.is_none());
}
#[test]
fn test_next_channel_id() {
let client = DXLinkClient::new("wss://test.url", "test_token");
let id1 = client.next_channel_id().unwrap();
let id2 = client.next_channel_id().unwrap();
assert_eq!(id2, id1 + 1);
}
#[test]
fn test_validate_channel() {
let client = DXLinkClient::new("wss://test.url", "test_token");
{
let mut channels = client.channels.lock().unwrap();
channels.insert(1, "FEED".to_string());
channels.insert(2, "OTHER".to_string());
}
let result = client.validate_channel(1, "FEED");
assert!(result.is_ok());
let result = client.validate_channel(1, "OTHER");
assert!(result.is_err());
match result {
Err(DXLinkError::Channel(_)) => {}
_ => panic!("Expected Channel error"),
}
let result = client.validate_channel(3, "FEED");
assert!(result.is_err());
match result {
Err(DXLinkError::Channel(_)) => {}
_ => panic!("Expected Channel error"),
}
}
#[test]
fn test_on_event() {
let client = DXLinkClient::new("wss://test.url", "test_token");
let called = Arc::new(Mutex::new(false));
let called_clone = called.clone();
client.on_event("AAPL", move |_| {
let mut called = called_clone.lock().unwrap();
*called = true;
});
let callbacks = client.callbacks.lock().unwrap();
assert!(callbacks.contains_key("AAPL"));
if let Some(callback) = callbacks.get("AAPL") {
let quote_event = QuoteEvent {
event_type: "Quote".to_string(),
event_symbol: "AAPL".to_string(),
bid_price: 150.25,
ask_price: 150.50,
bid_size: 100.0,
ask_size: 150.0,
};
callback(MarketEvent::Quote(quote_event));
let called = called.lock().unwrap();
assert!(*called);
} else {
panic!("Callback was not registered");
}
}
#[test]
fn test_event_stream() {
let mut client = DXLinkClient::new("wss://test.url", "test_token");
let result = client.event_stream();
assert!(result.is_ok());
let result = client.event_stream();
assert!(result.is_err());
match result {
Err(DXLinkError::Protocol(msg)) => {
assert!(msg.contains("Event stream already created"));
}
_ => panic!("Expected Protocol error"),
}
}
#[test]
fn test_connection_errors() {
let mut client = DXLinkClient::new("wss://test.url", "test_token");
let result = client.start_keepalive();
assert!(result.is_err());
match result {
Err(DXLinkError::Connection(_)) => {}
_ => panic!("Expected Connection error"),
}
let result = client.start_message_processing();
assert!(result.is_err());
match result {
Err(DXLinkError::Connection(_)) => {}
_ => panic!("Expected Connection error"),
}
let result = client.get_connection_mut();
assert!(result.is_err());
match result {
Err(DXLinkError::Connection(_)) => {}
_ => panic!("Expected Connection error"),
}
}
}