use std::{thread, time::Duration};
use quickfix::*;
use utils::*;
mod utils;
fn run<F>(server_kind: FixSocketServerKind, setting_builder: F) -> Result<(), QuickFixError>
where
F: Fn(ServerType, u16) -> Result<SessionSettings, QuickFixError>,
{
let sender = FixRecorder::new(ServerType::Sender.session_id());
let receiver = FixRecorder::new(ServerType::Receiver.session_id());
let communication_port = find_available_port();
let settings_sender = setting_builder(ServerType::Sender, communication_port)?;
let settings_receiver = setting_builder(ServerType::Receiver, communication_port)?;
let log_factory = LogFactory::try_new(&StdLogger::Stdout)?;
let app_sender = Application::try_new(&sender)?;
let app_receiver = Application::try_new(&receiver)?;
let message_store_factory_sender = MemoryMessageStoreFactory::new();
let message_store_factory_receiver = MemoryMessageStoreFactory::new();
assert_eq!(sender.session_created(), 0);
assert_eq!(receiver.session_created(), 0);
assert_eq!(sender.admin_msg_count(), MsgCounter::default());
assert_eq!(sender.user_msg_count(), MsgCounter::default());
assert_eq!(receiver.admin_msg_count(), MsgCounter::default());
assert_eq!(receiver.user_msg_count(), MsgCounter::default());
let mut socket_sender = Initiator::try_new(
&settings_sender,
&app_sender,
&message_store_factory_sender,
&log_factory,
server_kind,
)?;
let mut socket_receiver = Acceptor::try_new(
&settings_receiver,
&app_receiver,
&message_store_factory_receiver,
&log_factory,
server_kind,
)?;
assert_eq!(sender.session_created(), 1);
assert_eq!(receiver.session_created(), 1);
assert!(!sender.is_logged_in());
assert!(!socket_sender.is_logged_on().unwrap());
assert!(!receiver.is_logged_in());
assert!(!socket_receiver.is_logged_on().unwrap());
socket_receiver.start()?;
socket_sender.start()?;
while !sender.is_logged_in() && !receiver.is_logged_in() {
thread::sleep(Duration::from_millis(50));
}
assert!(sender.is_logged_in());
assert!(socket_sender.is_logged_on().unwrap());
assert!(receiver.is_logged_in());
assert!(socket_receiver.is_logged_on().unwrap());
assert_eq!(sender.admin_msg_count(), MsgCounter { recv: 1, sent: 1 });
assert_eq!(receiver.admin_msg_count(), MsgCounter { recv: 1, sent: 1 });
assert_eq!(sender.user_msg_count(), MsgCounter::default());
assert_eq!(receiver.user_msg_count(), MsgCounter::default());
let news = build_news("Hello", &[])?;
send_to_target(news, &ServerType::Sender.session_id())?;
thread::sleep(Duration::from_millis(50));
assert_eq!(sender.user_msg_count(), MsgCounter { sent: 1, recv: 0 });
assert_eq!(receiver.user_msg_count(), MsgCounter { sent: 0, recv: 1 });
let news = build_news(
"Anyone here",
&["This news have", "some content", "that is very interesting"],
)?;
send_to_target(news, &ServerType::Receiver.session_id())?;
thread::sleep(Duration::from_millis(50));
assert_eq!(sender.user_msg_count(), MsgCounter { sent: 1, recv: 1 });
assert_eq!(receiver.user_msg_count(), MsgCounter { sent: 1, recv: 1 });
assert_eq!(
socket_sender
.session(ServerType::Receiver.session_id())
.unwrap_err(),
QuickFixError::SessionNotFound(
"No session found: SessionId(\"FIX.4.4:RECEIVER->SENDER\")".to_string()
)
);
assert_eq!(
socket_receiver
.session(ServerType::Sender.session_id())
.unwrap_err(),
QuickFixError::SessionNotFound(
"No session found: SessionId(\"FIX.4.4:SENDER->RECEIVER\")".to_string()
)
);
let news = build_news("Hello", &[])?;
socket_sender
.session(ServerType::Sender.session_id())?
.send(news)?;
thread::sleep(Duration::from_millis(50));
assert_eq!(sender.user_msg_count(), MsgCounter { sent: 2, recv: 1 });
assert_eq!(receiver.user_msg_count(), MsgCounter { sent: 1, recv: 2 });
let news = build_news(
"Anyone here",
&["This news have", "some content", "that is very interesting"],
)?;
socket_receiver
.session(ServerType::Receiver.session_id())?
.send(news)?;
thread::sleep(Duration::from_millis(50));
assert_eq!(sender.user_msg_count(), MsgCounter { sent: 2, recv: 2 });
assert_eq!(receiver.user_msg_count(), MsgCounter { sent: 2, recv: 2 });
socket_receiver.stop()?;
socket_sender.stop()?;
assert!(!sender.is_logged_in());
assert!(!socket_sender.is_logged_on().unwrap());
assert!(!receiver.is_logged_in());
assert!(!socket_receiver.is_logged_on().unwrap());
assert!(matches!(
sender.admin_msg_count(),
MsgCounter { recv: 2, .. } ));
assert_eq!(receiver.admin_msg_count(), MsgCounter { recv: 2, sent: 2 });
Ok(())
}
#[test]
fn test_full_fix_application_single_thread() -> Result<(), QuickFixError> {
run(FixSocketServerKind::SingleThreaded, build_settings)
}
#[test]
fn test_full_fix_application_multi_thread() -> Result<(), QuickFixError> {
run(FixSocketServerKind::MultiThreaded, build_settings)
}
#[test]
#[cfg(feature = "build-with-ssl")]
fn test_full_fix_application_ssl_single_thread() -> Result<(), QuickFixError> {
run(FixSocketServerKind::SslSingleThreaded, build_ssl_settings)
}
#[test]
#[cfg(feature = "build-with-ssl")]
fn test_full_fix_application_ssl_multi_thread() -> Result<(), QuickFixError> {
run(FixSocketServerKind::SslMultiThreaded, build_ssl_settings)
}