use serde::{Deserialize, Serialize};
use time_tz::Tz;
use crate::errors::Error;
use crate::messages::{IncomingMessages, Notice, OutgoingMessages, ResponseMessage};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum SubscriptionItem<T> {
Data(T),
Notice(Notice),
}
impl<T> SubscriptionItem<T> {
pub fn into_data(self) -> Option<T> {
match self {
SubscriptionItem::Data(t) => Some(t),
SubscriptionItem::Notice(_) => None,
}
}
}
pub(crate) fn filter_notice<T>(item: Result<SubscriptionItem<T>, Error>) -> Option<Result<T, Error>> {
match item {
Ok(SubscriptionItem::Data(t)) => Some(Ok(t)),
Ok(SubscriptionItem::Notice(n)) => {
log::warn!("ib notice on subscription: {n}");
None
}
Err(e) => Some(Err(e)),
}
}
#[derive(Debug, Clone)]
pub(crate) enum RoutedItem {
Response(ResponseMessage),
Notice(Notice),
Error(Error),
}
impl From<ResponseMessage> for RoutedItem {
fn from(message: ResponseMessage) -> Self {
RoutedItem::Response(message)
}
}
impl From<Error> for RoutedItem {
fn from(error: Error) -> Self {
RoutedItem::Error(error)
}
}
impl RoutedItem {
pub(crate) fn into_legacy(self) -> Option<Result<ResponseMessage, Error>> {
match self {
RoutedItem::Response(message) => Some(Ok(message)),
RoutedItem::Error(error) => Some(Err(error)),
RoutedItem::Notice(_) => None,
}
}
}
#[allow(dead_code)]
pub(crate) fn is_stream_end(error: &Error) -> bool {
matches!(error, Error::EndOfStream)
}
#[allow(dead_code)]
pub(crate) fn should_store_error(error: &Error) -> bool {
!is_stream_end(error)
}
#[derive(Debug)]
pub(crate) enum ProcessingResult<T> {
Success(T),
Skip,
Error(Error),
EndOfStream,
}
pub(crate) fn process_decode_result<T>(result: Result<T, Error>) -> ProcessingResult<T> {
match result {
Ok(val) => ProcessingResult::Success(val),
Err(Error::EndOfStream) => ProcessingResult::EndOfStream,
Err(Error::UnexpectedResponse(_)) => ProcessingResult::Skip,
Err(err) => ProcessingResult::Error(err),
}
}
#[derive(Debug, Clone, Default, PartialEq)]
pub struct DecoderContext {
pub server_version: i32,
pub time_zone: Option<&'static Tz>,
pub request_type: Option<OutgoingMessages>,
pub is_smart_depth: bool,
}
impl DecoderContext {
pub fn new(server_version: i32, time_zone: Option<&'static Tz>) -> Self {
Self {
server_version,
time_zone,
request_type: None,
is_smart_depth: false,
}
}
#[allow(dead_code)]
pub fn with_request_type(mut self, request_type: OutgoingMessages) -> Self {
self.request_type = Some(request_type);
self
}
pub fn with_smart_depth(mut self, is_smart_depth: bool) -> Self {
self.is_smart_depth = is_smart_depth;
self
}
}
pub(crate) trait StreamDecoder<T> {
#[allow(dead_code)]
const RESPONSE_MESSAGE_IDS: &'static [IncomingMessages] = &[];
fn decode(context: &DecoderContext, message: &mut ResponseMessage) -> Result<T, Error>;
fn cancel_message(_server_version: i32, _request_id: Option<i32>, _context: Option<&DecoderContext>) -> Result<Vec<u8>, Error> {
Err(Error::NotImplemented)
}
#[allow(unused)]
fn is_snapshot_end(&self) -> bool {
false
}
}
#[cfg(test)]
#[path = "common_tests.rs"]
mod tests;