Trait barter_data::Subscriber
source · pub trait Subscriber {
type SubResponse: Validator + DeserializeOwned;
fn base_url() -> &'static str;
fn build_subscription_meta(
subscriptions: &[Subscription]
) -> Result<SubscriptionMeta, SocketError>;
fn subscribe<'life0, 'async_trait>(
subscriptions: &'life0 [Subscription]
) -> Pin<Box<dyn Future<Output = Result<(WebSocket, SubscriptionIds), SocketError>> + Send + 'async_trait>>
where
Self: Send + 'async_trait,
'life0: 'async_trait,
{ ... }
fn validate<'life0, 'async_trait>(
ids: SubscriptionIds,
websocket: &'life0 mut WebSocket,
expected_responses: usize
) -> Pin<Box<dyn Future<Output = Result<SubscriptionIds, SocketError>> + Send + 'async_trait>>
where
Self: Send + 'async_trait,
'life0: 'async_trait,
{ ... }
fn subscription_timeout() -> Duration { ... }
}Expand description
Trait that defines how a subscriber will establish a WebSocket connection with an exchange,
and action Subscriptions. This must be implemented when integrating a new exchange.
Required Associated Types§
sourcetype SubResponse: Validator + DeserializeOwned
type SubResponse: Validator + DeserializeOwned
Deserialisable type that this Subscriber expects to receive from the exchange in
response to Subscription requests. Implements Validator in order to determine
if the SubResponse communicates a successful outcome.
Required Methods§
sourcefn base_url() -> &'static str
fn base_url() -> &'static str
Returns the Base URL of the exchange to establish a connection with.
sourcefn build_subscription_meta(
subscriptions: &[Subscription]
) -> Result<SubscriptionMeta, SocketError>
fn build_subscription_meta(
subscriptions: &[Subscription]
) -> Result<SubscriptionMeta, SocketError>
Uses the provided Barter Subscriptions to build exchange specific subscription
payloads. Generates a SubscriptionIds Hashmap that is used by an ExchangeTransformer
to identify the Barter Subscriptions associated with received messages.
Provided Methods§
sourcefn subscribe<'life0, 'async_trait>(
subscriptions: &'life0 [Subscription]
) -> Pin<Box<dyn Future<Output = Result<(WebSocket, SubscriptionIds), SocketError>> + Send + 'async_trait>>where
Self: Send + 'async_trait,
'life0: 'async_trait,
fn subscribe<'life0, 'async_trait>(
subscriptions: &'life0 [Subscription]
) -> Pin<Box<dyn Future<Output = Result<(WebSocket, SubscriptionIds), SocketError>> + Send + 'async_trait>>where
Self: Send + 'async_trait,
'life0: 'async_trait,
Initialises a WebSocket connection, actions the provided collection of Barter
Subscriptions, and validates that the Subscription were accepted by the exchange.
Examples found in repository?
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
async fn init(subscriptions: &[Subscription]) -> Result<Self, SocketError> {
// Connect & subscribe
let (websocket, ids) = Exchange::subscribe(subscriptions).await?;
// Split WebSocket into WsStream & WsSink components
let (ws_sink, ws_stream) = websocket.split();
// Task to distribute ExchangeTransformer outgoing messages (eg/ custom pongs) to exchange
// --> ExchangeTransformer is operating in a synchronous trait context
// --> ExchangeTransformer sends messages sync via channel to async distribution task
// --> Async distribution tasks forwards the messages to the exchange via the ws_sink
let (ws_sink_tx, ws_sink_rx) = mpsc::unbounded_channel();
tokio::spawn(distribute_responses_to_the_exchange(
Exchange::EXCHANGE,
ws_sink,
ws_sink_rx,
));
// Construct ExchangeTransformer w/ transmitter to WsSink
let transformer = Exchange::new(ws_sink_tx, ids);
Ok(ExchangeWsStream::new(ws_stream, transformer))
}sourcefn validate<'life0, 'async_trait>(
ids: SubscriptionIds,
websocket: &'life0 mut WebSocket,
expected_responses: usize
) -> Pin<Box<dyn Future<Output = Result<SubscriptionIds, SocketError>> + Send + 'async_trait>>where
Self: Send + 'async_trait,
'life0: 'async_trait,
fn validate<'life0, 'async_trait>(
ids: SubscriptionIds,
websocket: &'life0 mut WebSocket,
expected_responses: usize
) -> Pin<Box<dyn Future<Output = Result<SubscriptionIds, SocketError>> + Send + 'async_trait>>where
Self: Send + 'async_trait,
'life0: 'async_trait,
Uses the provided WebSocket connection to consume Subscription responses and
validate their outcomes.
Examples found in repository?
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
async fn subscribe(
subscriptions: &[Subscription],
) -> Result<(WebSocket, SubscriptionIds), SocketError> {
// Connect to exchange
let mut websocket = connect(Self::base_url()).await?;
// Subscribe
let SubscriptionMeta {
ids,
subscriptions,
expected_responses,
} = Self::build_subscription_meta(subscriptions)?;
for subscription in subscriptions {
websocket.send(subscription).await?;
}
// Validate subscriptions
let ids = Self::validate(ids, &mut websocket, expected_responses).await?;
Ok((websocket, ids))
}sourcefn subscription_timeout() -> Duration
fn subscription_timeout() -> Duration
Return the expected Duration in which the exchange will respond to all actioned
WebSocket Subscription requests.
Default: 10 seconds
Examples found in repository?
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
async fn validate(
ids: SubscriptionIds,
websocket: &mut WebSocket,
expected_responses: usize,
) -> Result<SubscriptionIds, SocketError> {
// Establish time limit in which we expect to validate all the Subscriptions
let timeout = Self::subscription_timeout();
// Parameter to keep track of successful Subscription outcomes
let mut success_responses = 0usize;
loop {
// Break if all Subscriptions were a success
if success_responses == expected_responses {
break Ok(ids);
}
tokio::select! {
// If timeout reached, return SubscribeError
_ = tokio::time::sleep(timeout) => {
break Err(SocketError::Subscribe(
format!("subscription validation timeout reached: {:?}", timeout))
)
},
// Parse incoming messages and determine subscription outcomes
message = websocket.next() => match message {
Some(Ok(WsMessage::Text(payload))) => {
if let Ok(response) = serde_json::from_str::<Self::SubResponse>(&payload) {
match response.validate() {
// Subscription success
Ok(_) => { success_responses += 1; }
// Subscription failure
Err(err) => break Err(err)
}
} else {
// Some already active Subscriptions may start coming through
continue;
}
},
Some(Ok(WsMessage::Close(close_frame))) => {
break Err(SocketError::Subscribe(
format!("received WebSocket CloseFrame: {:?}", close_frame))
)
},
_ => continue,
}
}
}
}