use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use crate::client::ClientRequestBuilders;
use crate::subscriptions::Subscription;
use crate::transport::AsyncMessageBus;
use crate::{Client, Error};
use super::common::stream_decoders::DisplayGroupUpdate;
use super::encoders;
pub struct DisplayGroupSubscription {
inner: Subscription<DisplayGroupUpdate>,
message_bus: Arc<dyn AsyncMessageBus>,
}
impl DisplayGroupSubscription {
pub async fn update(&self, contract_info: &str) -> Result<(), Error> {
let request_id = self.inner.request_id().expect("subscription has no request ID");
let request = encoders::encode_update_display_group(request_id, contract_info)?;
self.message_bus.send_message(request).await
}
}
impl Deref for DisplayGroupSubscription {
type Target = Subscription<DisplayGroupUpdate>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl DerefMut for DisplayGroupSubscription {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
pub async fn subscribe_to_group_events(client: &Client, group_id: i32) -> Result<DisplayGroupSubscription, Error> {
let builder = client.request();
let request = encoders::encode_subscribe_to_group_events(builder.request_id(), group_id)?;
let inner = builder.send::<DisplayGroupUpdate>(request).await?;
Ok(DisplayGroupSubscription {
inner,
message_bus: client.message_bus.clone(),
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::stubs::MessageBusStub;
use std::sync::{Arc, RwLock};
#[tokio::test]
async fn test_subscribe_to_group_events() {
let message_bus = Arc::new(MessageBusStub {
request_messages: RwLock::new(vec![]),
response_messages: vec!["68\x001\x009000\x00265598@SMART\x00".to_string()],
});
let client = Client::stubbed(message_bus.clone(), 176);
let mut subscription = subscribe_to_group_events(&client, 1).await.expect("failed to subscribe");
{
let requests = message_bus.request_messages.read().unwrap();
assert_eq!(requests.len(), 1);
let req = &requests[0];
assert_eq!(req[0], "68"); assert_eq!(req[1], "1"); assert_eq!(req[3], "1"); }
let result = subscription.next().await;
assert!(result.is_some());
let update = result.unwrap().unwrap();
assert_eq!(update.contract_info, "265598@SMART");
}
#[tokio::test]
async fn test_subscribe_to_group_events_empty_group() {
let message_bus = Arc::new(MessageBusStub {
request_messages: RwLock::new(vec![]),
response_messages: vec!["68\x001\x009000\x00".to_string()],
});
let client = Client::stubbed(message_bus, 176);
let mut subscription = subscribe_to_group_events(&client, 2).await.expect("failed to subscribe");
let result = subscription.next().await;
assert!(result.is_some());
let update = result.unwrap().unwrap();
assert_eq!(update.contract_info, "");
}
#[tokio::test]
async fn test_update_display_group() {
let message_bus = Arc::new(MessageBusStub {
request_messages: RwLock::new(vec![]),
response_messages: vec!["68\x001\x009000\x00265598@SMART\x00".to_string()],
});
let client = Client::stubbed(message_bus.clone(), 176);
let subscription = subscribe_to_group_events(&client, 1).await.expect("failed to subscribe");
subscription.update("265598@SMART").await.expect("update failed");
let requests = message_bus.request_messages.read().unwrap();
assert_eq!(requests.len(), 2);
let req = &requests[1];
assert_eq!(req[0], "69"); assert_eq!(req[1], "1"); assert_eq!(req[3], "265598@SMART"); }
#[tokio::test]
async fn test_subscribe_to_group_events_skips_wrong_message_type() {
let message_bus = Arc::new(MessageBusStub {
request_messages: RwLock::new(vec![]),
response_messages: vec![
"67\x001\x009000\x00wrong message\x00".to_string(),
"68\x001\x009000\x00correct message\x00".to_string(),
],
});
let client = Client::stubbed(message_bus, 176);
let mut subscription = subscribe_to_group_events(&client, 1).await.expect("failed to subscribe");
let result = subscription.next().await;
assert!(result.is_some());
let update = result.unwrap().unwrap();
assert_eq!(update.contract_info, "correct message");
}
}