forgefix/lib.rs
1//! An opinionated FIX 4.2 client library for the buy-side.
2//!
3//! ForgeFIX is an engine that implements a subset of the FIX protocol which allows users to connect
4//! to brokers or exchanges to send and receive messages.
5//!
6//! ## Terminology
7//! * `FIX Connection` -- A single connection to a FIX Session. A network connection is made over TCP,
8//! then a FIX logon handshake is performed to establish the FIX connection. The FIX connection
9//! ends properly with a FIX logout, but is considered ended if the TCP connection breaks.
10//! * Note, the term 'connection' is overloaded and can also mean TCP connection. When unclear, a
11//! 'connection' will be specified as TCP or FIX.
12//!
13//! * `FIX Session` -- A conceptual construct that represents the bidirectional stream of ordered
14//! messages between two peers. A FIX Session can live across multiple instances of a FIX
15//! connection.
16//!
17//! * `FIX Engine` -- A sub-process running in the background that manages a single FIX connection
18//! to a FIX Session. The engine starts, runs, and ends the FIX connection as defined by the FIX
19//! protocol, and manages all resources that support the connection.
20//!
21//! ## Examples
22//!
23//! ### Asynchronous API
24//! ```no_run
25//! use forgefix::{
26//! SessionSettings, FixApplicationHandle, FixApplicationInitiator, ApplicationError,
27//! };
28//!
29//! #[tokio::main]
30//! async fn main() -> Result<(), ApplicationError> {
31//!
32//! // build session settings
33//! let settings = SessionSettings::builder()
34//! .with_sender_comp_id("my_id")
35//! .with_target_comp_id("peer_id")
36//! .with_store_path("./store".into())
37//! .with_log_dir("./log".into())
38//! .with_socket_addr("127.0.0.1:0".parse().unwrap())
39//! .build()?;
40//!
41//! // create a FIX engine and intiate TCP connection
42//! let (fix_handle, mut event_receiver) = FixApplicationInitiator::build(settings)?
43//! .initiate()
44//! .await?;
45//!
46//! // handle incoming messages in the background...
47//! tokio::spawn(async move {
48//! while let Some(msg) = event_receiver.recv().await {
49//! println!("got an application message: {}", msg);
50//! }
51//! });
52//!
53//! // start the FIX connection
54//! fix_handle.start_async().await?;
55//!
56//! // send messages here...
57//!
58//! // end the FIX connection
59//! fix_handle.end_async().await?;
60//!
61//! Ok(())
62//! }
63//! ```
64//!
65//! ### Synchronous API*
66//! ```no_run
67//! use forgefix::{
68//! SessionSettings, FixApplicationHandle, FixApplicationInitiator, ApplicationError,
69//! };
70//!
71//! fn main() -> Result<(), ApplicationError> {
72//!
73//! let settings = SessionSettings::builder()
74//! .with_sender_comp_id("my_id")
75//! .with_target_comp_id("peer_id")
76//! .with_store_path("./store".into())
77//! .with_log_dir("./log".into())
78//! .with_socket_addr("127.0.0.1:0".parse().unwrap())
79//! .build()?;
80//!
81//! let (fix_handle, mut event_receiver) = FixApplicationInitiator::build(settings)?
82//! .initiate_sync()?;
83//!
84//! std::thread::spawn(move || {
85//! while let Some(msg) = event_receiver.blocking_recv() {
86//! println!("got an application message: {}", msg);
87//! }
88//! });
89//!
90//! fix_handle.start_sync()?;
91//!
92//! // send messages here...
93//!
94//! fix_handle.end_sync()?;
95//!
96//! Ok(())
97//! }
98//! ```
99//! *When using synchronous API, a tokio runtime is still created internally (see
100//! [`FixApplicationInitiator`])
101//!
102//! ## Feature Flags
103//!
104//! - `sqlite`: Enables the sqlite3 database backed store. Allows messages and
105//! sequences numbers to be persisted to disk, so ForgeFIX can pick up in the middle of a session
106//! if restarted. Otherwise, every restart results in a new FIX session.
107
108pub mod fix;
109use fix::encode::MessageBuilder;
110use fix::mem::MsgBuf;
111
112use std::net::SocketAddr;
113use std::path::PathBuf;
114use std::sync::Arc;
115use std::time::Duration;
116
117use thiserror::Error;
118use tokio::net::{TcpListener, TcpSocket, TcpStream};
119use tokio::sync::{mpsc, oneshot};
120
121use chrono::naive::NaiveTime;
122
123enum Request {
124 Logon {
125 resp_sender: oneshot::Sender<bool>,
126 },
127 SendMessage {
128 resp_sender: oneshot::Sender<bool>,
129 builder: MessageBuilder,
130 },
131 Logout {
132 resp_sender: oneshot::Sender<bool>,
133 },
134}
135
136/// Errors that can occur while running ForgeFIX.
137#[derive(Debug, Error)]
138pub enum ApplicationError {
139 #[error("An I/O error occured: {0}")]
140 IoError(#[from] std::io::Error),
141 #[error("Session ended unexpectedly")]
142 SessionEnded,
143 #[error("Logon has failed")]
144 LogonFailed,
145 #[error("Logout has failed")]
146 LogoutFailed,
147 #[error("MessageSend has failed")]
148 SendMessageFailed,
149 #[error("setting `{0}` is required")]
150 SettingRequired(String),
151}
152
153/// A collection of settings used to configurate a FIX session.
154///
155/// `SessionSettings` can be constructed using the [`SessionSettingsBuilder`], or can be constructed explicitly.
156#[derive(Clone)]
157pub struct SessionSettings {
158 begin_string: Arc<String>,
159 engine_type: FixEngineType,
160 sender_comp_id: String,
161 target_comp_id: String,
162 addr: SocketAddr,
163 epoch: Arc<String>,
164 store_path: PathBuf,
165 log_dir: PathBuf,
166 heartbeat_timeout: Duration,
167 start_time: NaiveTime,
168}
169
170/// A builder for easily configuring all the fields of a [`SessionSettings`]
171///
172/// The following settings are required to be set:
173/// * sender comp id
174/// * target comp id
175/// * addr
176/// * store path
177/// * log dir
178#[derive(Default)]
179pub struct SessionSettingsBuilder {
180 sender_comp_id: Option<String>,
181 target_comp_id: Option<String>,
182 addr: Option<SocketAddr>,
183 begin_string: Option<String>,
184 epoch: Option<String>,
185 store_path: Option<PathBuf>,
186 log_dir: Option<PathBuf>,
187 heartbeat_timeout: Option<Duration>,
188 start_time: Option<NaiveTime>,
189}
190
191impl SessionSettingsBuilder {
192 pub fn new() -> SessionSettingsBuilder {
193 Default::default()
194 }
195
196 /// The time the FIX session starts each day.
197 pub fn with_start_time(mut self, start_time: NaiveTime) -> Self {
198 self.set_start_time(start_time);
199 self
200 }
201 pub fn set_start_time(&mut self, start_time: NaiveTime) {
202 self.start_time = Some(start_time);
203 }
204
205 /// The `SenderCompID(49)` that will be included in each message.
206 pub fn with_sender_comp_id(mut self, sender_comp_id: &str) -> Self {
207 self.set_sender_comp_id(sender_comp_id);
208 self
209 }
210 pub fn set_sender_comp_id(&mut self, sender_comp_id: &str) {
211 self.sender_comp_id = Some(sender_comp_id.to_string());
212 }
213
214 /// The `TargetCompID(56)` that will be included in each message.
215 pub fn with_target_comp_id(mut self, target_comp_id: &str) -> Self {
216 self.set_target_comp_id(target_comp_id);
217 self
218 }
219 pub fn set_target_comp_id(&mut self, target_comp_id: &str) {
220 self.target_comp_id = Some(target_comp_id.to_string());
221 }
222
223 /// The address to initiate a connection to, or accept connections on.
224 pub fn with_socket_addr(mut self, addr: SocketAddr) -> Self {
225 self.addr = Some(addr);
226 self
227 }
228 pub fn set_socket_addr(&mut self, addr: SocketAddr) {
229 self.addr = Some(addr);
230 }
231
232 /// The `BeginString(8)` that will be included in each message.
233 pub fn with_begin_string(mut self, begin_string: &str) -> Self {
234 self.set_begin_string(begin_string);
235 self
236 }
237 pub fn set_begin_string(&mut self, begin_string: &str) {
238 self.begin_string = Some(begin_string.to_string());
239 }
240
241 /// A local unique identifier for this FIX session.
242 pub fn with_epoch(mut self, epoch: &str) -> Self {
243 self.set_epoch(epoch);
244 self
245 }
246 pub fn set_epoch(&mut self, epoch: &str) {
247 self.epoch = Some(epoch.to_string());
248 }
249
250 /// The file that should be used as the sqlite database file.
251 pub fn with_store_path(mut self, store_path: PathBuf) -> Self {
252 self.set_store_path(store_path);
253 self
254 }
255 pub fn set_store_path(&mut self, store_path: PathBuf) {
256 self.store_path = Some(store_path);
257 }
258
259 /// The directory that should be used to create log files.
260 pub fn with_log_dir(mut self, log_dir: PathBuf) -> Self {
261 self.set_log_dir(log_dir);
262 self
263 }
264 pub fn set_log_dir(&mut self, log_dir: PathBuf) {
265 self.log_dir = Some(log_dir);
266 }
267
268 /// The timeout length used for sending `Heartbeat<0>` messages.
269 pub fn with_heartbeat_timeout(mut self, hb_timeout: Duration) -> Self {
270 self.set_heartbeat_timeout(hb_timeout);
271 self
272 }
273 pub fn set_heartbeat_timeout(&mut self, hb_timeout: Duration) {
274 self.heartbeat_timeout = Some(hb_timeout);
275 }
276
277 /// Build the [`SessionSettings`] struct.
278 ///
279 /// Returns an `Err(ApplicationError::SettingRequired)` if not all of the required fields
280 /// were set.
281 pub fn build(self) -> Result<SessionSettings, ApplicationError> {
282 let sender_comp_id = self
283 .sender_comp_id
284 .ok_or(ApplicationError::SettingRequired(
285 "sender_comp_id".to_string(),
286 ))?;
287 let target_comp_id = self
288 .target_comp_id
289 .ok_or(ApplicationError::SettingRequired(
290 "target_comp_id".to_string(),
291 ))?;
292 let addr = self
293 .addr
294 .ok_or(ApplicationError::SettingRequired("addr".to_string()))?;
295 let store_path = self
296 .store_path
297 .ok_or(ApplicationError::SettingRequired("store_path".to_string()))?;
298 let log_dir = self
299 .log_dir
300 .ok_or(ApplicationError::SettingRequired("log_dir".to_string()))?;
301
302 Ok(SessionSettings {
303 engine_type: FixEngineType::Client,
304 begin_string: Arc::new(self.begin_string.unwrap_or(String::from("FIX.4.2"))),
305 epoch: Arc::new(
306 self.epoch
307 .unwrap_or(format!("{}_{}", &sender_comp_id, &target_comp_id)),
308 ),
309 heartbeat_timeout: self.heartbeat_timeout.unwrap_or(Duration::from_secs(30)),
310 start_time: self.start_time.unwrap_or_default(),
311 sender_comp_id,
312 target_comp_id,
313 addr,
314 store_path,
315 log_dir,
316 })
317 }
318}
319
320impl SessionSettings {
321 /// Creates a new [`SessionSettingsBuilder`]
322 pub fn builder() -> SessionSettingsBuilder {
323 SessionSettingsBuilder::new()
324 }
325
326 fn expected_sender_comp_id(&self) -> &str {
327 &self.target_comp_id
328 }
329
330 fn expected_target_comp_id(&self) -> &str {
331 &self.sender_comp_id
332 }
333}
334
335/// A handle on a FIX engine instance.
336///
337/// The [`FixApplicationHandle`] allows for requesting the basic operations of starting the FIX connection, sending
338/// a message to the peer, and ending the connection.
339///
340/// The handle offers asynchronous and synchronous APIs for these operations. As well as functions
341/// that return immedietly with a [`oneshot::Receiver`] that will eventually return the result of the
342/// operation.
343///
344/// The underlying engine could stop running at any moment for a variety of reasons. Only until you
345/// attempt an operation, will you learn the engine has stopped by receiving an
346/// [`ApplicationError::SessionEnded`].
347///
348/// [`FixApplicationHandle`] `impl`'s [`Clone`], [`Send`] and [`Sync`] and therefore multiple
349/// copies of the handle can be made and passed to different threads that can all request messages
350/// to be sent. Only one thread has to call [`end`] for the engine to terminate the connection.
351///
352/// [`oneshot::Receiver`]: https://docs.rs/tokio/latest/tokio/sync/oneshot/struct.Receiver.html
353/// [`end`]: FixApplicationHandle::end
354///
355/// # Example - Multiple Threads
356///
357///```no_run
358/// use forgefix::{
359/// SessionSettings, FixApplicationInitiator, ApplicationError
360/// };
361/// use forgefix::fix::{encode::MessageBuilder, generated::MsgType};
362/// # use anyhow::Result;
363/// # #[tokio::main]
364/// # async fn main() -> Result<()> {
365/// # let settings = SessionSettings::builder()
366/// # .with_sender_comp_id("my_id")
367/// # .with_target_comp_id("peer_id")
368/// # .with_store_path("./store".into())
369/// # .with_log_dir("./log".into())
370/// # .with_socket_addr("127.0.0.1:0".parse().unwrap())
371/// # .build()?;
372///
373/// let (handle, mut receiver) = FixApplicationInitiator::build(settings)?
374/// .initiate()
375/// .await?;
376/// receiver.close();
377///
378/// // FixApplicationHandle can be cloned
379/// let handle1 = handle.clone();
380/// let handle2 = handle.clone();
381///
382/// // FixApplicationHandle clones can be sent across threads and tasks
383/// let h1 = tokio::spawn(async move {
384///
385/// // thread logic here...
386///
387/// let builder = MessageBuilder::new(
388/// &handle1.begin_string(),
389/// MsgType::ORDER_SINGLE.into()
390/// );
391/// handle1.send_message_async(builder).await
392///
393/// // ...
394/// });
395///
396/// // send to multiple tasks...
397/// let h2 = tokio::spawn(async move {
398/// let builder = MessageBuilder::new(
399/// &handle2.begin_string(),
400/// MsgType::ORDER_SINGLE.into()
401/// );
402/// handle2.send_message_async(builder).await
403/// });
404///
405/// // wait for all threads to finish...
406/// let (res1, res2) = tokio::join!(h1, h2);
407/// res1??;
408/// res2??;
409///
410/// // end the FIX connection
411/// handle.end_async().await?;
412/// # Ok(())
413/// # }
414///
415///```
416#[derive(Clone)]
417pub struct FixApplicationHandle {
418 request_sender: mpsc::UnboundedSender<Request>,
419 begin_string: Arc<String>,
420}
421
422impl FixApplicationHandle {
423 /// Send a request to the engine to start the connection and return immediately.
424 ///
425 /// The receiver will eventually yield `true` if a connection was successfully established, or
426 /// `false` othersize.
427 pub fn start(&self) -> Result<oneshot::Receiver<bool>, ApplicationError> {
428 if self.request_sender.is_closed() {
429 return Err(ApplicationError::SessionEnded);
430 }
431 let (resp_sender, resp_receiver) = oneshot::channel();
432 let logon_request = Request::Logon { resp_sender };
433 let _ = self.request_sender.send(logon_request);
434 Ok(resp_receiver)
435 }
436 /// Send a request to the engine to start the connection and await asynchronously.
437 pub async fn start_async(&self) -> Result<(), ApplicationError> {
438 let resp_sender = self.start()?;
439 if Ok(true) != resp_sender.await {
440 return Err(ApplicationError::LogonFailed);
441 }
442 Ok(())
443 }
444 /// Send a request to the engine to start a connection, and block until a result is returned.
445 pub fn start_sync(&self) -> Result<(), ApplicationError> {
446 let resp_receiver = self.start()?;
447 if Ok(true) != resp_receiver.blocking_recv() {
448 return Err(ApplicationError::LogonFailed);
449 }
450 Ok(())
451 }
452
453 /// Send a request to the engine to send the message in the [`MessageBuilder`] to the peer, and return immediately.
454 ///
455 /// If the request was successfully sent to the engine, a [`oneshot::Receiver`] will be
456 /// returned.
457 ///
458 /// The receiver will yield `true` once the message has successfully sent over the TCP
459 /// connection. It will yeild `false` if a message cannot be sent.
460 ///
461 /// [`oneshot::Receiver`]: https://docs.rs/tokio/latest/tokio/sync/oneshot/struct.Receiver.html
462 pub fn send_message(
463 &self,
464 builder: MessageBuilder,
465 ) -> Result<oneshot::Receiver<bool>, ApplicationError> {
466 if self.request_sender.is_closed() {
467 return Err(ApplicationError::SessionEnded);
468 }
469 let (resp_sender, resp_receiver) = oneshot::channel();
470 let send_message_request = Request::SendMessage {
471 resp_sender,
472 builder,
473 };
474 let _ = self.request_sender.send(send_message_request);
475 Ok(resp_receiver)
476 }
477 /// Send a request to the engine to send the message in `builder` and await asynchronously.
478 pub async fn send_message_async(
479 &self,
480 builder: MessageBuilder,
481 ) -> Result<(), ApplicationError> {
482 let resp_sender = self.send_message(builder)?;
483 if Ok(true) != resp_sender.await {
484 return Err(ApplicationError::SendMessageFailed);
485 }
486 Ok(())
487 }
488 /// Send a request to the engine to send the message in `builder` and block until a result is
489 /// returned.
490 pub fn send_message_sync(&self, builder: MessageBuilder) -> Result<(), ApplicationError> {
491 let resp_receiver = self.send_message(builder)?;
492 if Ok(true) != resp_receiver.blocking_recv() {
493 return Err(ApplicationError::SendMessageFailed);
494 }
495 Ok(())
496 }
497
498 /// Send a request to the engine to end the FIX connection, and return immediately.
499 ///
500 /// If the request was successfully send to the engine, a [`oneshot::Receiver`] will be
501 /// returned.
502 ///
503 /// The receiver will yield `true` is the FIX connection is over, and ended without any issues.
504 /// Otherwise it will be `false`.
505 ///
506 /// [`oneshot::Receiver`]: https://docs.rs/tokio/latest/tokio/sync/oneshot/struct.Receiver.html
507 pub fn end(&self) -> Result<oneshot::Receiver<bool>, ApplicationError> {
508 let (resp_sender, resp_receiver) = oneshot::channel();
509 let logout_request = Request::Logout { resp_sender };
510 let _ = self.request_sender.send(logout_request);
511 Ok(resp_receiver)
512 }
513 /// Send a request to the engine to end the FIX connection, and await asynchronously.
514 pub async fn end_async(&self) -> Result<(), ApplicationError> {
515 let resp_sender = self.end()?;
516 if Ok(true) != resp_sender.await {
517 return Err(ApplicationError::LogoutFailed);
518 }
519 Ok(())
520 }
521 /// Send a request to the engine to end the FIX connection, and block until a result is
522 /// returned.
523 pub fn end_sync(&self) -> Result<(), ApplicationError> {
524 let resp_receiver = self.end()?;
525 if Ok(true) != resp_receiver.blocking_recv() {
526 return Err(ApplicationError::LogoutFailed);
527 }
528 Ok(())
529 }
530
531 /// Get the `BeginString(8)` of this FIX Session. Should generally be `"FIX.4.2"`.
532 pub fn begin_string(&self) -> Arc<String> {
533 Arc::clone(&self.begin_string)
534 }
535}
536
537/// A struct that can initiate the TCP connection to the peer and create a FIX engine instance.
538pub struct FixApplicationInitiator {
539 settings: SessionSettings,
540 stream_factory: StreamFactory,
541}
542
543impl FixApplicationInitiator {
544 /// Build a `FixApplicationInitiator` that will create a FIX engine using `settings`.
545 #[allow(clippy::too_many_arguments)]
546 pub fn build(
547 mut settings: SessionSettings,
548 ) -> Result<FixApplicationInitiator, ApplicationError> {
549 settings.engine_type = FixEngineType::Client;
550 let stream_factory = StreamFactory::build(&settings)?;
551 let fix_app_client = FixApplicationInitiator {
552 settings,
553 stream_factory,
554 };
555 Ok(fix_app_client)
556 }
557
558 /// Initiate a TCP connection and start the FIX engine with the current asynchronous runtime.
559 ///
560 /// If the connection is successfully made, a [`FixApplicationHandle`] will be returned, and an
561 /// `UnboundedReceiver<Arc<MsgBuf>>` will be returned.
562 ///
563 /// The application handle can be used to start the FIX connection, send messages and end the
564 /// connection.
565 ///
566 /// The receiver is a channel where all incoming, valid application messages can be received.
567 /// If you do not want to use the channel, it is recommended you call [`close`].
568 ///
569 /// [`close`]: tokio::sync::mpsc::UnboundedReceiver::close
570 pub async fn initiate(
571 self,
572 ) -> Result<(FixApplicationHandle, mpsc::UnboundedReceiver<Arc<MsgBuf>>), ApplicationError>
573 {
574 let stream = self.stream_factory.stream().await?;
575 let (request_sender, request_receiver) = mpsc::unbounded_channel::<Request>();
576 let (app_message_event_sender, app_message_event_receiver) =
577 mpsc::unbounded_channel::<Arc<MsgBuf>>();
578 let begin_string = Arc::clone(&self.settings.begin_string);
579
580 tokio::spawn(async move {
581 if let Err(e) = fix::spin_session(
582 stream,
583 request_receiver,
584 app_message_event_sender,
585 self.settings,
586 )
587 .await
588 {
589 eprintln!("{e:?}");
590 }
591 });
592
593 let handle = FixApplicationHandle {
594 request_sender,
595 begin_string,
596 };
597
598 Ok((handle, app_message_event_receiver))
599 }
600
601 /// Initiate a TCP connection and start the FIX engine that will be driven by `runtime`.
602 pub fn initiate_with_runtime(
603 self,
604 runtime: tokio::runtime::Runtime,
605 ) -> Result<(FixApplicationHandle, mpsc::UnboundedReceiver<Arc<MsgBuf>>), ApplicationError>
606 {
607 let (request_sender, request_receiver) = mpsc::unbounded_channel::<Request>();
608 let (app_message_event_sender, app_message_event_receiver) =
609 mpsc::unbounded_channel::<Arc<MsgBuf>>();
610 let begin_string = Arc::clone(&self.settings.begin_string);
611 let stream = runtime.block_on(self.stream_factory.stream())?;
612
613 std::thread::spawn(move || {
614 if let Err(e) = runtime.block_on(fix::spin_session(
615 stream,
616 request_receiver,
617 app_message_event_sender,
618 self.settings,
619 )) {
620 eprintln!("{e:?}");
621 }
622 });
623 let handle = FixApplicationHandle {
624 request_sender,
625 begin_string,
626 };
627
628 Ok((handle, app_message_event_receiver))
629 }
630
631 /// Initiate a TCP connection, and a runtime will be created internally to drive the engine.
632 pub fn initiate_sync(
633 self,
634 ) -> Result<(FixApplicationHandle, mpsc::UnboundedReceiver<Arc<MsgBuf>>), ApplicationError>
635 {
636 let runtime = tokio::runtime::Builder::new_multi_thread()
637 .enable_all()
638 .build()?;
639 self.initiate_with_runtime(runtime)
640 }
641}
642
643/// A struct that can accept TCP connections, and create a FIX engine instance for each connection.
644pub struct FixApplicationAcceptor {
645 settings: SessionSettings,
646 stream_factory: StreamFactory,
647}
648
649impl FixApplicationAcceptor {
650 /// Build a `FixApplicationAcceptor` from `settings`.
651 #[allow(clippy::too_many_arguments)]
652 pub fn build(
653 mut settings: SessionSettings,
654 ) -> Result<FixApplicationAcceptor, ApplicationError> {
655 settings.engine_type = FixEngineType::Server;
656 let stream_factory = StreamFactory::build(&settings)?;
657 let fix_app_server = FixApplicationAcceptor {
658 settings,
659 stream_factory,
660 };
661 Ok(fix_app_server)
662 }
663
664 /// Accept an incoming TCP connection and create a FIX engine.
665 ///
666 /// Returns the handle to the created engine, and a channel to receive all valid, incoming application
667 /// messages.
668 pub async fn accept(
669 &mut self,
670 ) -> Result<(FixApplicationHandle, mpsc::UnboundedReceiver<Arc<MsgBuf>>), ApplicationError>
671 {
672 let stream = self.stream_factory.stream().await?;
673 let settings = self.settings.clone();
674 let (request_sender, request_receiver) = mpsc::unbounded_channel::<Request>();
675 let (app_message_event_sender, app_message_event_receiver) =
676 mpsc::unbounded_channel::<Arc<MsgBuf>>();
677 let begin_string = Arc::clone(&self.settings.begin_string);
678
679 tokio::task::spawn(async move {
680 if let Err(e) =
681 fix::spin_session(stream, request_receiver, app_message_event_sender, settings)
682 .await
683 {
684 eprintln!("{e:?}");
685 }
686 });
687
688 let handle = FixApplicationHandle {
689 request_sender,
690 begin_string,
691 };
692
693 Ok((handle, app_message_event_receiver))
694 }
695}
696
697#[derive(Clone)]
698enum FixEngineType {
699 Client,
700 Server,
701}
702
703enum StreamFactory {
704 Server(TcpListener),
705 Client(std::net::SocketAddr),
706}
707
708impl StreamFactory {
709 fn build(settings: &SessionSettings) -> Result<Self, std::io::Error> {
710 match settings.engine_type {
711 FixEngineType::Client => Ok(StreamFactory::Client(settings.addr)),
712 FixEngineType::Server => {
713 let socket = TcpSocket::new_v4()?;
714 socket.bind(settings.addr)?;
715 let listener = socket.listen(1024)?;
716 Ok(StreamFactory::Server(listener))
717 }
718 }
719 }
720 async fn stream(&self) -> Result<TcpStream, std::io::Error> {
721 match self {
722 StreamFactory::Server(listener) => {
723 let (stream, _from_addr) = listener.accept().await?;
724 Ok(stream)
725 }
726 StreamFactory::Client(addr) => {
727 let socket = TcpSocket::new_v4()?;
728 Ok(socket.connect(*addr).await?)
729 }
730 }
731 }
732}