use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use log::{debug, error, warn};
use super::common::{filter_notice, process_decode_result, DecoderContext, ProcessingResult, RoutedItem, SubscriptionItem};
use super::StreamDecoder;
use crate::errors::Error;
use crate::messages::OutgoingMessages;
use crate::transport::{InternalSubscription, MessageBus};
#[allow(private_bounds)]
#[must_use = "Subscription must be iterated (via .next(), .iter_data(), or .into_iter()) to receive data; dropping it cancels the request"]
pub struct Subscription<T: StreamDecoder<T>> {
context: DecoderContext,
message_bus: Arc<dyn MessageBus>,
request_id: Option<i32>,
order_id: Option<i32>,
message_type: Option<OutgoingMessages>,
phantom: PhantomData<T>,
cancelled: AtomicBool,
snapshot_ended: AtomicBool,
stream_ended: AtomicBool,
subscription: InternalSubscription,
}
enum NextAction<T> {
Return(Option<T>),
Skip,
}
#[allow(private_bounds)]
impl<T: StreamDecoder<T>> Subscription<T> {
pub(crate) fn new(message_bus: Arc<dyn MessageBus>, subscription: InternalSubscription, context: DecoderContext) -> Self {
let request_id = subscription.request_id;
let order_id = subscription.order_id;
let message_type = subscription.message_type;
Subscription {
context,
message_bus,
request_id,
order_id,
message_type,
subscription,
phantom: PhantomData,
cancelled: AtomicBool::new(false),
snapshot_ended: AtomicBool::new(false),
stream_ended: AtomicBool::new(false),
}
}
pub fn cancel(&self) {
if self.snapshot_ended.load(Ordering::Relaxed) {
return;
}
if self.cancelled.load(Ordering::Relaxed) {
return;
}
self.cancelled.store(true, Ordering::Relaxed);
if let Some(request_id) = self.request_id {
if let Ok(message) = T::cancel_message(self.context.server_version, self.request_id, Some(&self.context)) {
if let Err(e) = self.message_bus.cancel_subscription(request_id, &message) {
warn!("error cancelling subscription: {e}")
}
self.subscription.cancel();
}
} else if let Some(order_id) = self.order_id {
if let Ok(message) = T::cancel_message(self.context.server_version, self.request_id, Some(&self.context)) {
if let Err(e) = self.message_bus.cancel_order_subscription(order_id, &message) {
warn!("error cancelling order subscription: {e}")
}
self.subscription.cancel();
}
} else if let Some(message_type) = self.message_type {
if let Ok(message) = T::cancel_message(self.context.server_version, self.request_id, Some(&self.context)) {
if let Err(e) = self.message_bus.cancel_shared_subscription(message_type, &message) {
warn!("error cancelling shared subscription: {e}")
}
self.subscription.cancel();
}
} else {
debug!("Could not determine cancel method")
}
}
pub fn request_id(&self) -> Option<i32> {
self.request_id
}
pub fn next(&self) -> Option<Result<SubscriptionItem<T>, Error>> {
if self.stream_ended.load(Ordering::Relaxed) {
return None;
}
loop {
match self.handle_response(self.subscription.next_routed()) {
NextAction::Return(val) => return val,
NextAction::Skip => continue,
}
}
}
fn handle_response(&self, response: Option<RoutedItem>) -> NextAction<Result<SubscriptionItem<T>, Error>> {
match response {
Some(RoutedItem::Response(mut message)) => match process_decode_result(T::decode(&self.context, &mut message)) {
ProcessingResult::Success(val) => {
if val.is_snapshot_end() {
self.snapshot_ended.store(true, Ordering::Relaxed);
}
NextAction::Return(Some(Ok(SubscriptionItem::Data(val))))
}
ProcessingResult::Skip => {
log::trace!("skipping unexpected message on shared channel");
NextAction::Skip
}
ProcessingResult::EndOfStream => {
self.stream_ended.store(true, Ordering::Relaxed);
NextAction::Return(None)
}
ProcessingResult::Error(err) => {
match &err {
Error::Notice(n) => warn!("subscription terminated by TWS error {n}"),
_ => error!("error decoding message: {err}"),
}
self.stream_ended.store(true, Ordering::Relaxed);
NextAction::Return(Some(Err(err)))
}
},
Some(RoutedItem::Notice(notice)) => NextAction::Return(Some(Ok(SubscriptionItem::Notice(notice)))),
Some(RoutedItem::Error(Error::EndOfStream)) => {
self.stream_ended.store(true, Ordering::Relaxed);
NextAction::Return(None)
}
Some(RoutedItem::Error(e)) => {
self.stream_ended.store(true, Ordering::Relaxed);
NextAction::Return(Some(Err(e)))
}
None => NextAction::Return(None),
}
}
pub fn try_next(&self) -> Option<Result<SubscriptionItem<T>, Error>> {
if self.stream_ended.load(Ordering::Relaxed) {
return None;
}
loop {
match self.handle_response(self.subscription.try_next_routed()) {
NextAction::Return(val) => return val,
NextAction::Skip => continue,
}
}
}
pub fn next_timeout(&self, timeout: Duration) -> Option<Result<SubscriptionItem<T>, Error>> {
if self.stream_ended.load(Ordering::Relaxed) {
return None;
}
let deadline = Instant::now() + timeout;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
return None;
}
match self.handle_response(self.subscription.next_timeout_routed(remaining)) {
NextAction::Return(val) => return val,
NextAction::Skip => continue,
}
}
}
pub fn next_data(&self) -> Option<Result<T, Error>> {
self.iter_data().next()
}
pub fn iter(&self) -> SubscriptionIter<'_, T> {
SubscriptionIter { subscription: self }
}
pub fn try_iter(&self) -> SubscriptionTryIter<'_, T> {
SubscriptionTryIter { subscription: self }
}
pub fn timeout_iter(&self, timeout: Duration) -> SubscriptionTimeoutIter<'_, T> {
SubscriptionTimeoutIter { subscription: self, timeout }
}
pub fn iter_data(&self) -> FilterData<SubscriptionIter<'_, T>> {
self.iter().filter_data()
}
pub fn try_iter_data(&self) -> FilterData<SubscriptionTryIter<'_, T>> {
self.try_iter().filter_data()
}
pub fn timeout_iter_data(&self, timeout: Duration) -> FilterData<SubscriptionTimeoutIter<'_, T>> {
self.timeout_iter(timeout).filter_data()
}
}
impl<T: StreamDecoder<T>> Drop for Subscription<T> {
fn drop(&mut self) {
debug!("dropping subscription");
self.cancel();
}
}
#[must_use = "iterator adapters are lazy and do nothing unless consumed"]
pub struct FilterData<I> {
inner: I,
}
impl<I, T> Iterator for FilterData<I>
where
I: Iterator<Item = Result<SubscriptionItem<T>, Error>>,
{
type Item = Result<T, Error>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(out) = filter_notice(self.inner.next()?) {
return Some(out);
}
}
}
}
pub trait SubscriptionItemIterExt: Iterator + Sized {
fn filter_data<T>(self) -> FilterData<Self>
where
Self: Iterator<Item = Result<SubscriptionItem<T>, Error>>,
{
FilterData { inner: self }
}
}
impl<I: Iterator> SubscriptionItemIterExt for I {}
#[allow(private_bounds)]
#[must_use = "iterators are lazy and do nothing unless consumed"]
pub struct SubscriptionIter<'a, T: StreamDecoder<T>> {
subscription: &'a Subscription<T>,
}
impl<T: StreamDecoder<T>> Iterator for SubscriptionIter<'_, T> {
type Item = Result<SubscriptionItem<T>, Error>;
fn next(&mut self) -> Option<Self::Item> {
self.subscription.next()
}
}
impl<'a, T: StreamDecoder<T>> IntoIterator for &'a Subscription<T> {
type Item = Result<SubscriptionItem<T>, Error>;
type IntoIter = SubscriptionIter<'a, T>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
#[allow(private_bounds)]
#[must_use = "iterators are lazy and do nothing unless consumed"]
pub struct SubscriptionOwnedIter<T: StreamDecoder<T>> {
subscription: Subscription<T>,
}
impl<T: StreamDecoder<T>> Iterator for SubscriptionOwnedIter<T> {
type Item = Result<SubscriptionItem<T>, Error>;
fn next(&mut self) -> Option<Self::Item> {
self.subscription.next()
}
}
impl<T: StreamDecoder<T>> IntoIterator for Subscription<T> {
type Item = Result<SubscriptionItem<T>, Error>;
type IntoIter = SubscriptionOwnedIter<T>;
fn into_iter(self) -> Self::IntoIter {
SubscriptionOwnedIter { subscription: self }
}
}
#[allow(private_bounds)]
#[must_use = "iterators are lazy and do nothing unless consumed"]
pub struct SubscriptionTryIter<'a, T: StreamDecoder<T>> {
subscription: &'a Subscription<T>,
}
impl<T: StreamDecoder<T>> Iterator for SubscriptionTryIter<'_, T> {
type Item = Result<SubscriptionItem<T>, Error>;
fn next(&mut self) -> Option<Self::Item> {
self.subscription.try_next()
}
}
#[allow(private_bounds)]
#[must_use = "iterators are lazy and do nothing unless consumed"]
pub struct SubscriptionTimeoutIter<'a, T: StreamDecoder<T>> {
subscription: &'a Subscription<T>,
timeout: Duration,
}
impl<T: StreamDecoder<T>> Iterator for SubscriptionTimeoutIter<'_, T> {
type Item = Result<SubscriptionItem<T>, Error>;
fn next(&mut self) -> Option<Self::Item> {
self.subscription.next_timeout(self.timeout)
}
}
pub trait SharesChannel {}
#[cfg(all(test, feature = "sync"))]
#[path = "sync_tests.rs"]
mod tests;