hotfix 0.12.0

Buy-side FIX engine written in pure Rust
Documentation
use thiserror::Error;
use tokio::sync::{mpsc, oneshot};
use tracing::debug;

use crate::config::SessionConfig;
use crate::message::{OutboundMessage, RawFixMessage};
use crate::session::Session;
use crate::session::admin_request::AdminRequest;
use crate::session::error::{SendError, SendOutcome, SessionCreationError};
use crate::session::event::{ScheduleResponse, SessionEvent};
use crate::store::MessageStore;
use crate::transport::writer::WriterRef;
use crate::{Application, session};

/// A request to send an outbound message, optionally with confirmation.
pub(crate) struct OutboundRequest<M> {
    pub message: M,
    pub confirm: Option<oneshot::Sender<Result<SendOutcome, SendError>>>,
}

#[derive(Clone)]
pub struct InternalSessionRef<Outbound> {
    pub(crate) event_sender: mpsc::Sender<SessionEvent>,
    pub(crate) outbound_message_sender: mpsc::Sender<OutboundRequest<Outbound>>,
    pub(crate) admin_request_sender: mpsc::Sender<AdminRequest>,
}

impl<Outbound: OutboundMessage> InternalSessionRef<Outbound> {
    pub fn new(
        config: SessionConfig,
        application: impl Application<Outbound = Outbound>,
        store: impl MessageStore + 'static,
    ) -> Result<Self, SessionCreationError> {
        let (event_sender, event_receiver) = mpsc::channel::<SessionEvent>(100);
        let (outbound_message_sender, outbound_message_receiver) =
            mpsc::channel::<OutboundRequest<Outbound>>(10);
        let (admin_request_sender, admin_request_receiver) = mpsc::channel::<AdminRequest>(10);
        let session = Session::new(config, application, store)?;
        tokio::spawn(session::run_session(
            session,
            event_receiver,
            outbound_message_receiver,
            admin_request_receiver,
        ));

        Ok(Self {
            event_sender,
            outbound_message_sender,
            admin_request_sender,
        })
    }

    pub async fn register_writer(&self, writer: WriterRef) -> Result<(), SessionGone> {
        self.event_sender
            .send(SessionEvent::Connected(writer))
            .await?;

        Ok(())
    }

    pub async fn new_fix_message_received(&self, msg: RawFixMessage) -> Result<(), SessionGone> {
        self.event_sender
            .send(SessionEvent::FixMessageReceived(msg))
            .await?;

        Ok(())
    }

    pub async fn disconnect(&self, reason: String) -> Result<(), SessionGone> {
        self.event_sender
            .send(SessionEvent::Disconnected(reason))
            .await?;

        Ok(())
    }

    pub async fn should_reconnect(&self) -> Result<bool, SessionGone> {
        let (sender, receiver) = oneshot::channel();
        self.event_sender
            .send(SessionEvent::ShouldReconnect(sender))
            .await?;
        Ok(receiver.await?)
    }

    pub async fn await_in_schedule(&self) -> Result<ScheduleResponse, SessionGone> {
        debug!("awaiting in-schedule time");
        let (sender, receiver) = oneshot::channel::<ScheduleResponse>();
        self.event_sender
            .send(SessionEvent::AwaitSchedule(sender))
            .await?;
        Ok(receiver.await?)
    }
}

#[derive(Debug, Error)]
#[error("session task terminated")]
pub struct SessionGone(String);

impl From<mpsc::error::SendError<SessionEvent>> for SessionGone {
    fn from(err: mpsc::error::SendError<SessionEvent>) -> Self {
        Self(err.to_string())
    }
}

impl From<oneshot::error::RecvError> for SessionGone {
    fn from(err: oneshot::error::RecvError) -> Self {
        Self(err.to_string())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::session::test_utils::create_test_session_ref;

    #[tokio::test]
    async fn await_in_schedule_returns_in_schedule_when_session_responds_in_schedule() {
        let (session_ref, mut event_receiver) = create_test_session_ref();

        tokio::spawn(async move {
            match event_receiver.recv().await {
                Some(SessionEvent::AwaitSchedule(responder)) => {
                    let _ = responder.send(ScheduleResponse::InSchedule);
                }
                other => panic!("unexpected event: {other:?}"),
            }
        });

        let result = session_ref.await_in_schedule().await;
        assert!(matches!(result, Ok(ScheduleResponse::InSchedule)));
    }

    #[tokio::test]
    async fn await_in_schedule_returns_shutdown_when_session_responds_shutdown() {
        let (session_ref, mut event_receiver) = create_test_session_ref();

        tokio::spawn(async move {
            match event_receiver.recv().await {
                Some(SessionEvent::AwaitSchedule(responder)) => {
                    let _ = responder.send(ScheduleResponse::Shutdown);
                }
                other => panic!("unexpected event: {other:?}"),
            }
        });

        let result = session_ref.await_in_schedule().await;
        assert!(matches!(result, Ok(ScheduleResponse::Shutdown)));
    }

    #[tokio::test]
    async fn await_in_schedule_returns_err_when_event_channel_closed() {
        let (session_ref, event_receiver) = create_test_session_ref();
        drop(event_receiver);

        let result = session_ref.await_in_schedule().await;
        assert!(matches!(result, Err(SessionGone(_))));
    }
}