forgefix/
lib.rs

1//! An opinionated FIX 4.2 client library for the buy-side.
2//!
3//! ForgeFIX is an engine that implements a subset of the FIX protocol which allows users to connect
4//! to brokers or exchanges to send and receive messages.
5//!
6//! ## Terminology
7//! * `FIX Connection` -- A single connection to a FIX Session. A network connection is made over TCP,
8//! then a FIX logon handshake is performed to establish the FIX connection. The FIX connection
9//! ends properly with a FIX logout, but is considered ended if the TCP connection breaks.
10//!     * Note, the term 'connection' is overloaded and can also mean TCP connection. When unclear, a
11//! 'connection' will be specified as TCP or FIX.
12//!
13//! * `FIX Session` -- A conceptual construct that represents the bidirectional stream of ordered
14//! messages between two peers. A FIX Session can live across multiple instances of a FIX
15//! connection.
16//!
17//! * `FIX Engine` -- A sub-process running in the background that manages a single FIX connection
18//! to a FIX Session. The engine starts, runs, and ends the FIX connection as defined by the FIX
19//! protocol, and manages all resources that support the connection.
20//!
21//! ## Examples
22//!
23//! ### Asynchronous API
24//! ```no_run
25//! use forgefix::{
26//!     SessionSettings, FixApplicationHandle, FixApplicationInitiator, ApplicationError,
27//! };
28//!
29//! #[tokio::main]
30//! async fn main() -> Result<(), ApplicationError> {
31//!     
32//!     // build session settings
33//!     let settings = SessionSettings::builder()
34//!         .with_sender_comp_id("my_id")
35//!         .with_target_comp_id("peer_id")
36//!         .with_store_path("./store".into())
37//!         .with_log_dir("./log".into())
38//!         .with_socket_addr("127.0.0.1:0".parse().unwrap())
39//!         .build()?;
40//!
41//!     // create a FIX engine and intiate TCP connection
42//!     let (fix_handle, mut event_receiver) = FixApplicationInitiator::build(settings)?
43//!         .initiate()
44//!         .await?;
45//!
46//!     // handle incoming messages in the background...
47//!     tokio::spawn(async move {
48//!         while let Some(msg) = event_receiver.recv().await {
49//!             println!("got an application message: {}", msg);
50//!         }
51//!     });
52//!
53//!     // start the FIX connection
54//!     fix_handle.start_async().await?;
55//!
56//!     // send messages here...
57//!
58//!     // end the FIX connection
59//!     fix_handle.end_async().await?;
60//!
61//!     Ok(())
62//! }
63//! ```
64//!
65//! ### Synchronous API*
66//! ```no_run
67//! use forgefix::{
68//!     SessionSettings, FixApplicationHandle, FixApplicationInitiator, ApplicationError,
69//! };
70//!
71//! fn main() -> Result<(), ApplicationError> {
72//!
73//!     let settings = SessionSettings::builder()
74//!         .with_sender_comp_id("my_id")
75//!         .with_target_comp_id("peer_id")
76//!         .with_store_path("./store".into())
77//!         .with_log_dir("./log".into())
78//!         .with_socket_addr("127.0.0.1:0".parse().unwrap())
79//!         .build()?;
80//!
81//!     let (fix_handle, mut event_receiver) = FixApplicationInitiator::build(settings)?
82//!         .initiate_sync()?;
83//!
84//!     std::thread::spawn(move || {
85//!         while let Some(msg) = event_receiver.blocking_recv() {
86//!             println!("got an application message: {}", msg);
87//!         }
88//!     });
89//!
90//!     fix_handle.start_sync()?;
91//!
92//!     // send messages here...
93//!
94//!     fix_handle.end_sync()?;
95//!     
96//!     Ok(())
97//! }
98//! ```
99//! *When using synchronous API, a tokio runtime is still created internally (see
100//! [`FixApplicationInitiator`])
101//!
102//! ## Feature Flags
103//!
104//! - `sqlite`: Enables the sqlite3 database backed store. Allows messages and
105//! sequences numbers to be persisted to disk, so ForgeFIX can pick up in the middle of a session
106//! if restarted. Otherwise, every restart results in a new FIX session.
107
108pub mod fix;
109use fix::encode::MessageBuilder;
110use fix::mem::MsgBuf;
111
112use std::net::SocketAddr;
113use std::path::PathBuf;
114use std::sync::Arc;
115use std::time::Duration;
116
117use thiserror::Error;
118use tokio::net::{TcpListener, TcpSocket, TcpStream};
119use tokio::sync::{mpsc, oneshot};
120
121use chrono::naive::NaiveTime;
122
123enum Request {
124    Logon {
125        resp_sender: oneshot::Sender<bool>,
126    },
127    SendMessage {
128        resp_sender: oneshot::Sender<bool>,
129        builder: MessageBuilder,
130    },
131    Logout {
132        resp_sender: oneshot::Sender<bool>,
133    },
134}
135
136/// Errors that can occur while running ForgeFIX.
137#[derive(Debug, Error)]
138pub enum ApplicationError {
139    #[error("An I/O error occured: {0}")]
140    IoError(#[from] std::io::Error),
141    #[error("Session ended unexpectedly")]
142    SessionEnded,
143    #[error("Logon has failed")]
144    LogonFailed,
145    #[error("Logout has failed")]
146    LogoutFailed,
147    #[error("MessageSend has failed")]
148    SendMessageFailed,
149    #[error("setting `{0}` is required")]
150    SettingRequired(String),
151}
152
153/// A collection of settings used to configurate a FIX session.
154///
155/// `SessionSettings` can be constructed using the [`SessionSettingsBuilder`], or can be constructed explicitly.
156#[derive(Clone)]
157pub struct SessionSettings {
158    begin_string: Arc<String>,
159    engine_type: FixEngineType,
160    sender_comp_id: String,
161    target_comp_id: String,
162    addr: SocketAddr,
163    epoch: Arc<String>,
164    store_path: PathBuf,
165    log_dir: PathBuf,
166    heartbeat_timeout: Duration,
167    start_time: NaiveTime,
168}
169
170/// A builder for easily configuring all the fields of a [`SessionSettings`]
171///
172/// The following settings are required to be set:
173/// * sender comp id
174/// * target comp id
175/// * addr
176/// * store path
177/// * log dir
178#[derive(Default)]
179pub struct SessionSettingsBuilder {
180    sender_comp_id: Option<String>,
181    target_comp_id: Option<String>,
182    addr: Option<SocketAddr>,
183    begin_string: Option<String>,
184    epoch: Option<String>,
185    store_path: Option<PathBuf>,
186    log_dir: Option<PathBuf>,
187    heartbeat_timeout: Option<Duration>,
188    start_time: Option<NaiveTime>,
189}
190
191impl SessionSettingsBuilder {
192    pub fn new() -> SessionSettingsBuilder {
193        Default::default()
194    }
195
196    /// The time the FIX session starts each day.
197    pub fn with_start_time(mut self, start_time: NaiveTime) -> Self {
198        self.set_start_time(start_time);
199        self
200    }
201    pub fn set_start_time(&mut self, start_time: NaiveTime) {
202        self.start_time = Some(start_time);
203    }
204
205    /// The `SenderCompID(49)` that will be included in each message.
206    pub fn with_sender_comp_id(mut self, sender_comp_id: &str) -> Self {
207        self.set_sender_comp_id(sender_comp_id);
208        self
209    }
210    pub fn set_sender_comp_id(&mut self, sender_comp_id: &str) {
211        self.sender_comp_id = Some(sender_comp_id.to_string());
212    }
213
214    /// The `TargetCompID(56)` that will be included in each message.
215    pub fn with_target_comp_id(mut self, target_comp_id: &str) -> Self {
216        self.set_target_comp_id(target_comp_id);
217        self
218    }
219    pub fn set_target_comp_id(&mut self, target_comp_id: &str) {
220        self.target_comp_id = Some(target_comp_id.to_string());
221    }
222
223    /// The address to initiate a connection to, or accept connections on.
224    pub fn with_socket_addr(mut self, addr: SocketAddr) -> Self {
225        self.addr = Some(addr);
226        self
227    }
228    pub fn set_socket_addr(&mut self, addr: SocketAddr) {
229        self.addr = Some(addr);
230    }
231
232    /// The `BeginString(8)` that will be included in each message.
233    pub fn with_begin_string(mut self, begin_string: &str) -> Self {
234        self.set_begin_string(begin_string);
235        self
236    }
237    pub fn set_begin_string(&mut self, begin_string: &str) {
238        self.begin_string = Some(begin_string.to_string());
239    }
240
241    /// A local unique identifier for this FIX session.
242    pub fn with_epoch(mut self, epoch: &str) -> Self {
243        self.set_epoch(epoch);
244        self
245    }
246    pub fn set_epoch(&mut self, epoch: &str) {
247        self.epoch = Some(epoch.to_string());
248    }
249
250    /// The file that should be used as the sqlite database file.
251    pub fn with_store_path(mut self, store_path: PathBuf) -> Self {
252        self.set_store_path(store_path);
253        self
254    }
255    pub fn set_store_path(&mut self, store_path: PathBuf) {
256        self.store_path = Some(store_path);
257    }
258
259    /// The directory that should be used to create log files.
260    pub fn with_log_dir(mut self, log_dir: PathBuf) -> Self {
261        self.set_log_dir(log_dir);
262        self
263    }
264    pub fn set_log_dir(&mut self, log_dir: PathBuf) {
265        self.log_dir = Some(log_dir);
266    }
267
268    /// The timeout length used for sending `Heartbeat<0>` messages.
269    pub fn with_heartbeat_timeout(mut self, hb_timeout: Duration) -> Self {
270        self.set_heartbeat_timeout(hb_timeout);
271        self
272    }
273    pub fn set_heartbeat_timeout(&mut self, hb_timeout: Duration) {
274        self.heartbeat_timeout = Some(hb_timeout);
275    }
276
277    /// Build the [`SessionSettings`] struct.
278    ///
279    /// Returns an `Err(ApplicationError::SettingRequired)` if not all of the required fields
280    /// were set.
281    pub fn build(self) -> Result<SessionSettings, ApplicationError> {
282        let sender_comp_id = self
283            .sender_comp_id
284            .ok_or(ApplicationError::SettingRequired(
285                "sender_comp_id".to_string(),
286            ))?;
287        let target_comp_id = self
288            .target_comp_id
289            .ok_or(ApplicationError::SettingRequired(
290                "target_comp_id".to_string(),
291            ))?;
292        let addr = self
293            .addr
294            .ok_or(ApplicationError::SettingRequired("addr".to_string()))?;
295        let store_path = self
296            .store_path
297            .ok_or(ApplicationError::SettingRequired("store_path".to_string()))?;
298        let log_dir = self
299            .log_dir
300            .ok_or(ApplicationError::SettingRequired("log_dir".to_string()))?;
301
302        Ok(SessionSettings {
303            engine_type: FixEngineType::Client,
304            begin_string: Arc::new(self.begin_string.unwrap_or(String::from("FIX.4.2"))),
305            epoch: Arc::new(
306                self.epoch
307                    .unwrap_or(format!("{}_{}", &sender_comp_id, &target_comp_id)),
308            ),
309            heartbeat_timeout: self.heartbeat_timeout.unwrap_or(Duration::from_secs(30)),
310            start_time: self.start_time.unwrap_or_default(),
311            sender_comp_id,
312            target_comp_id,
313            addr,
314            store_path,
315            log_dir,
316        })
317    }
318}
319
320impl SessionSettings {
321    /// Creates a new [`SessionSettingsBuilder`]
322    pub fn builder() -> SessionSettingsBuilder {
323        SessionSettingsBuilder::new()
324    }
325
326    fn expected_sender_comp_id(&self) -> &str {
327        &self.target_comp_id
328    }
329
330    fn expected_target_comp_id(&self) -> &str {
331        &self.sender_comp_id
332    }
333}
334
335/// A handle on a FIX engine instance.
336///
337/// The [`FixApplicationHandle`] allows for requesting the basic operations of starting the FIX connection, sending
338/// a message to the peer, and ending the connection.
339///
340/// The handle offers asynchronous and synchronous APIs for these operations. As well as functions
341/// that return immedietly with a [`oneshot::Receiver`] that will eventually return the result of the
342/// operation.
343///
344/// The underlying engine could stop running at any moment for a variety of reasons. Only until you
345/// attempt an operation, will you learn the engine has stopped by receiving an
346/// [`ApplicationError::SessionEnded`].
347///
348/// [`FixApplicationHandle`] `impl`'s [`Clone`], [`Send`] and [`Sync`] and therefore multiple
349/// copies of the handle can be made and passed to different threads that can all request messages
350/// to be sent. Only one thread has to call [`end`] for the engine to terminate the connection.
351///
352/// [`oneshot::Receiver`]: https://docs.rs/tokio/latest/tokio/sync/oneshot/struct.Receiver.html
353/// [`end`]: FixApplicationHandle::end
354///
355/// # Example - Multiple Threads
356///
357///```no_run
358/// use forgefix::{
359///     SessionSettings, FixApplicationInitiator, ApplicationError
360/// };
361/// use forgefix::fix::{encode::MessageBuilder, generated::MsgType};
362/// # use anyhow::Result;
363/// # #[tokio::main]
364/// # async fn main() -> Result<()> {
365/// #    let settings = SessionSettings::builder()
366/// #        .with_sender_comp_id("my_id")
367/// #        .with_target_comp_id("peer_id")
368/// #        .with_store_path("./store".into())
369/// #        .with_log_dir("./log".into())
370/// #        .with_socket_addr("127.0.0.1:0".parse().unwrap())
371/// #        .build()?;
372///
373/// let (handle, mut receiver) = FixApplicationInitiator::build(settings)?
374///     .initiate()
375///     .await?;
376/// receiver.close();
377///
378/// // FixApplicationHandle can be cloned
379/// let handle1 = handle.clone();
380/// let handle2 = handle.clone();
381///
382/// // FixApplicationHandle clones can be sent across threads and tasks
383/// let h1 = tokio::spawn(async move {
384///
385///     // thread logic here...
386///
387///     let builder = MessageBuilder::new(
388///         &handle1.begin_string(),
389///         MsgType::ORDER_SINGLE.into()
390///     );
391///     handle1.send_message_async(builder).await
392///
393///     // ...
394/// });
395///
396/// // send to multiple tasks...
397/// let h2 = tokio::spawn(async move {
398///     let builder = MessageBuilder::new(
399///         &handle2.begin_string(),
400///         MsgType::ORDER_SINGLE.into()
401///     );
402///     handle2.send_message_async(builder).await
403/// });
404///
405/// // wait for all threads to finish...
406/// let (res1, res2) = tokio::join!(h1, h2);
407/// res1??;
408/// res2??;
409///     
410/// // end the FIX connection
411/// handle.end_async().await?;
412///  #   Ok(())
413/// # }
414///
415///```
416#[derive(Clone)]
417pub struct FixApplicationHandle {
418    request_sender: mpsc::UnboundedSender<Request>,
419    begin_string: Arc<String>,
420}
421
422impl FixApplicationHandle {
423    /// Send a request to the engine to start the connection and return immediately.
424    ///
425    /// The receiver will eventually yield `true` if a connection was successfully established, or
426    /// `false` othersize.
427    pub fn start(&self) -> Result<oneshot::Receiver<bool>, ApplicationError> {
428        if self.request_sender.is_closed() {
429            return Err(ApplicationError::SessionEnded);
430        }
431        let (resp_sender, resp_receiver) = oneshot::channel();
432        let logon_request = Request::Logon { resp_sender };
433        let _ = self.request_sender.send(logon_request);
434        Ok(resp_receiver)
435    }
436    /// Send a request to the engine to start the connection and await asynchronously.
437    pub async fn start_async(&self) -> Result<(), ApplicationError> {
438        let resp_sender = self.start()?;
439        if Ok(true) != resp_sender.await {
440            return Err(ApplicationError::LogonFailed);
441        }
442        Ok(())
443    }
444    /// Send a request to the engine to start a connection, and block until a result is returned.
445    pub fn start_sync(&self) -> Result<(), ApplicationError> {
446        let resp_receiver = self.start()?;
447        if Ok(true) != resp_receiver.blocking_recv() {
448            return Err(ApplicationError::LogonFailed);
449        }
450        Ok(())
451    }
452
453    /// Send a request to the engine to send the message in the [`MessageBuilder`] to the peer, and return immediately.
454    ///
455    /// If the request was successfully sent to the engine, a [`oneshot::Receiver`] will be
456    /// returned.
457    ///
458    /// The receiver will yield `true` once the message has successfully sent over the TCP
459    /// connection. It will yeild `false` if a message cannot be sent.
460    ///
461    /// [`oneshot::Receiver`]: https://docs.rs/tokio/latest/tokio/sync/oneshot/struct.Receiver.html
462    pub fn send_message(
463        &self,
464        builder: MessageBuilder,
465    ) -> Result<oneshot::Receiver<bool>, ApplicationError> {
466        if self.request_sender.is_closed() {
467            return Err(ApplicationError::SessionEnded);
468        }
469        let (resp_sender, resp_receiver) = oneshot::channel();
470        let send_message_request = Request::SendMessage {
471            resp_sender,
472            builder,
473        };
474        let _ = self.request_sender.send(send_message_request);
475        Ok(resp_receiver)
476    }
477    /// Send a request to the engine to send the message in `builder` and await asynchronously.
478    pub async fn send_message_async(
479        &self,
480        builder: MessageBuilder,
481    ) -> Result<(), ApplicationError> {
482        let resp_sender = self.send_message(builder)?;
483        if Ok(true) != resp_sender.await {
484            return Err(ApplicationError::SendMessageFailed);
485        }
486        Ok(())
487    }
488    /// Send a request to the engine to send the message in `builder` and block until a result is
489    /// returned.
490    pub fn send_message_sync(&self, builder: MessageBuilder) -> Result<(), ApplicationError> {
491        let resp_receiver = self.send_message(builder)?;
492        if Ok(true) != resp_receiver.blocking_recv() {
493            return Err(ApplicationError::SendMessageFailed);
494        }
495        Ok(())
496    }
497
498    /// Send a request to the engine to end the FIX connection, and return immediately.
499    ///
500    /// If the request was successfully send to the engine, a [`oneshot::Receiver`] will be
501    /// returned.
502    ///
503    /// The receiver will yield `true` is the FIX connection is over, and ended without any issues.
504    /// Otherwise it will be `false`.
505    ///
506    /// [`oneshot::Receiver`]: https://docs.rs/tokio/latest/tokio/sync/oneshot/struct.Receiver.html
507    pub fn end(&self) -> Result<oneshot::Receiver<bool>, ApplicationError> {
508        let (resp_sender, resp_receiver) = oneshot::channel();
509        let logout_request = Request::Logout { resp_sender };
510        let _ = self.request_sender.send(logout_request);
511        Ok(resp_receiver)
512    }
513    /// Send a request to the engine to end the FIX connection, and await asynchronously.
514    pub async fn end_async(&self) -> Result<(), ApplicationError> {
515        let resp_sender = self.end()?;
516        if Ok(true) != resp_sender.await {
517            return Err(ApplicationError::LogoutFailed);
518        }
519        Ok(())
520    }
521    /// Send a request to the engine to end the FIX connection, and block until a result is
522    /// returned.
523    pub fn end_sync(&self) -> Result<(), ApplicationError> {
524        let resp_receiver = self.end()?;
525        if Ok(true) != resp_receiver.blocking_recv() {
526            return Err(ApplicationError::LogoutFailed);
527        }
528        Ok(())
529    }
530
531    /// Get the `BeginString(8)` of this FIX Session. Should generally be `"FIX.4.2"`.
532    pub fn begin_string(&self) -> Arc<String> {
533        Arc::clone(&self.begin_string)
534    }
535}
536
537/// A struct that can initiate the TCP connection to the peer and create a FIX engine instance.
538pub struct FixApplicationInitiator {
539    settings: SessionSettings,
540    stream_factory: StreamFactory,
541}
542
543impl FixApplicationInitiator {
544    /// Build a `FixApplicationInitiator` that will create a FIX engine using `settings`.
545    #[allow(clippy::too_many_arguments)]
546    pub fn build(
547        mut settings: SessionSettings,
548    ) -> Result<FixApplicationInitiator, ApplicationError> {
549        settings.engine_type = FixEngineType::Client;
550        let stream_factory = StreamFactory::build(&settings)?;
551        let fix_app_client = FixApplicationInitiator {
552            settings,
553            stream_factory,
554        };
555        Ok(fix_app_client)
556    }
557
558    /// Initiate a TCP connection and start the FIX engine with the current asynchronous runtime.
559    ///
560    /// If the connection is successfully made, a [`FixApplicationHandle`] will be returned, and an
561    /// `UnboundedReceiver<Arc<MsgBuf>>` will be returned.
562    ///
563    /// The application handle can be used to start the FIX connection, send messages and end the
564    /// connection.
565    ///
566    /// The receiver is a channel where all incoming, valid application messages can be received.
567    /// If you do not want to use the channel, it is recommended you call [`close`].
568    ///
569    /// [`close`]: tokio::sync::mpsc::UnboundedReceiver::close
570    pub async fn initiate(
571        self,
572    ) -> Result<(FixApplicationHandle, mpsc::UnboundedReceiver<Arc<MsgBuf>>), ApplicationError>
573    {
574        let stream = self.stream_factory.stream().await?;
575        let (request_sender, request_receiver) = mpsc::unbounded_channel::<Request>();
576        let (app_message_event_sender, app_message_event_receiver) =
577            mpsc::unbounded_channel::<Arc<MsgBuf>>();
578        let begin_string = Arc::clone(&self.settings.begin_string);
579
580        tokio::spawn(async move {
581            if let Err(e) = fix::spin_session(
582                stream,
583                request_receiver,
584                app_message_event_sender,
585                self.settings,
586            )
587            .await
588            {
589                eprintln!("{e:?}");
590            }
591        });
592
593        let handle = FixApplicationHandle {
594            request_sender,
595            begin_string,
596        };
597
598        Ok((handle, app_message_event_receiver))
599    }
600
601    /// Initiate a TCP connection and start the FIX engine that will be driven by `runtime`.
602    pub fn initiate_with_runtime(
603        self,
604        runtime: tokio::runtime::Runtime,
605    ) -> Result<(FixApplicationHandle, mpsc::UnboundedReceiver<Arc<MsgBuf>>), ApplicationError>
606    {
607        let (request_sender, request_receiver) = mpsc::unbounded_channel::<Request>();
608        let (app_message_event_sender, app_message_event_receiver) =
609            mpsc::unbounded_channel::<Arc<MsgBuf>>();
610        let begin_string = Arc::clone(&self.settings.begin_string);
611        let stream = runtime.block_on(self.stream_factory.stream())?;
612
613        std::thread::spawn(move || {
614            if let Err(e) = runtime.block_on(fix::spin_session(
615                stream,
616                request_receiver,
617                app_message_event_sender,
618                self.settings,
619            )) {
620                eprintln!("{e:?}");
621            }
622        });
623        let handle = FixApplicationHandle {
624            request_sender,
625            begin_string,
626        };
627
628        Ok((handle, app_message_event_receiver))
629    }
630
631    /// Initiate a TCP connection, and a runtime will be created internally to drive the engine.
632    pub fn initiate_sync(
633        self,
634    ) -> Result<(FixApplicationHandle, mpsc::UnboundedReceiver<Arc<MsgBuf>>), ApplicationError>
635    {
636        let runtime = tokio::runtime::Builder::new_multi_thread()
637            .enable_all()
638            .build()?;
639        self.initiate_with_runtime(runtime)
640    }
641}
642
643/// A struct that can accept TCP connections, and create a FIX engine instance for each connection.
644pub struct FixApplicationAcceptor {
645    settings: SessionSettings,
646    stream_factory: StreamFactory,
647}
648
649impl FixApplicationAcceptor {
650    /// Build a `FixApplicationAcceptor` from `settings`.
651    #[allow(clippy::too_many_arguments)]
652    pub fn build(
653        mut settings: SessionSettings,
654    ) -> Result<FixApplicationAcceptor, ApplicationError> {
655        settings.engine_type = FixEngineType::Server;
656        let stream_factory = StreamFactory::build(&settings)?;
657        let fix_app_server = FixApplicationAcceptor {
658            settings,
659            stream_factory,
660        };
661        Ok(fix_app_server)
662    }
663
664    /// Accept an incoming TCP connection and create a FIX engine.
665    ///
666    /// Returns the handle to the created engine, and a channel to receive all valid, incoming application
667    /// messages.
668    pub async fn accept(
669        &mut self,
670    ) -> Result<(FixApplicationHandle, mpsc::UnboundedReceiver<Arc<MsgBuf>>), ApplicationError>
671    {
672        let stream = self.stream_factory.stream().await?;
673        let settings = self.settings.clone();
674        let (request_sender, request_receiver) = mpsc::unbounded_channel::<Request>();
675        let (app_message_event_sender, app_message_event_receiver) =
676            mpsc::unbounded_channel::<Arc<MsgBuf>>();
677        let begin_string = Arc::clone(&self.settings.begin_string);
678
679        tokio::task::spawn(async move {
680            if let Err(e) =
681                fix::spin_session(stream, request_receiver, app_message_event_sender, settings)
682                    .await
683            {
684                eprintln!("{e:?}");
685            }
686        });
687
688        let handle = FixApplicationHandle {
689            request_sender,
690            begin_string,
691        };
692
693        Ok((handle, app_message_event_receiver))
694    }
695}
696
697#[derive(Clone)]
698enum FixEngineType {
699    Client,
700    Server,
701}
702
703enum StreamFactory {
704    Server(TcpListener),
705    Client(std::net::SocketAddr),
706}
707
708impl StreamFactory {
709    fn build(settings: &SessionSettings) -> Result<Self, std::io::Error> {
710        match settings.engine_type {
711            FixEngineType::Client => Ok(StreamFactory::Client(settings.addr)),
712            FixEngineType::Server => {
713                let socket = TcpSocket::new_v4()?;
714                socket.bind(settings.addr)?;
715                let listener = socket.listen(1024)?;
716                Ok(StreamFactory::Server(listener))
717            }
718        }
719    }
720    async fn stream(&self) -> Result<TcpStream, std::io::Error> {
721        match self {
722            StreamFactory::Server(listener) => {
723                let (stream, _from_addr) = listener.accept().await?;
724                Ok(stream)
725            }
726            StreamFactory::Client(addr) => {
727                let socket = TcpSocket::new_v4()?;
728                Ok(socket.connect(*addr).await?)
729            }
730        }
731    }
732}