Skip to main content

forgefix/
lib.rs

1//! An opinionated FIX 4.2 client library for the buy-side.
2//!
3//! ForgeFIX is an engine that implements a subset of the FIX protocol which allows users to connect
4//! to brokers or exchanges to send and receive messages.
5//!
6//! ## Terminology
7//! * `FIX Connection` -- A single connection to a FIX Session. A network connection is made over TCP,
8//! then a FIX logon handshake is performed to establish the FIX connection. The FIX connection
9//! ends properly with a FIX logout, but is considered ended if the TCP connection breaks.
10//!     * Note, the term 'connection' is overloaded and can also mean TCP connection. When unclear, a
11//! 'connection' will be specified as TCP or FIX.
12//!
13//! * `FIX Session` -- A conceptual construct that represents the bidirectional stream of ordered
14//! messages between two peers. A FIX Session can live across multiple instances of a FIX
15//! connection.
16//!
17//! * `FIX Engine` -- A sub-process running in the background that manages a single FIX connection
18//! to a FIX Session. The engine starts, runs, and ends the FIX connection as defined by the FIX
19//! protocol, and manages all resources that support the connection.
20//!
21//! ## Examples
22//!
23//! ### Asynchronous API
24//! ```no_run
25//! use forgefix::{
26//!     ApplicationError, SessionSettings, EngineFactory, EngineHandle,
27//! };
28//!
29//! #[tokio::main]
30//! async fn main() -> Result<(), ApplicationError> {
31//!     
32//!     // build session settings
33//!     let settings = SessionSettings::builder()
34//!         .with_sender_comp_id("my_id")
35//!         .with_target_comp_id("peer_id")
36//!         .with_store_path("./store".into())
37//!         .with_log_dir("./log".into())
38//!         .with_socket_addr("127.0.0.1:0".parse().unwrap())
39//!         .build()?;
40//!
41//!     // create a FIX engine and intiate TCP connection
42//!     let (handle, mut event_receiver) = EngineFactory::initiator(settings, forgefix::log::FileLoggerFactory)?
43//!         .start()
44//!         .await?;
45//!
46//!     // handle incoming messages in the background...
47//!     tokio::spawn(async move {
48//!         while let Some(msg) = event_receiver.recv().await {
49//!             println!("got an application message: {}", msg);
50//!         }
51//!     });
52//!
53//!     // logon to the FIX session
54//!     handle.logon_async().await?;
55//!
56//!     // send messages here...
57//!
58//!     // logout from the FIX session
59//!     handle.logout_async().await?;
60//!
61//!     Ok(())
62//! }
63//! ```
64//!
65//! ### Synchronous API*
66//! ```no_run
67//! use forgefix::{
68//!     ApplicationError, SessionSettings, EngineFactory, EngineHandle,
69//! };
70//!
71//! fn main() -> Result<(), ApplicationError> {
72//!
73//!     let settings = SessionSettings::builder()
74//!         .with_sender_comp_id("my_id")
75//!         .with_target_comp_id("peer_id")
76//!         .with_store_path("./store".into())
77//!         .with_log_dir("./log".into())
78//!         .with_socket_addr("127.0.0.1:0".parse().unwrap())
79//!         .build()?;
80//!
81//!     let (handle, mut event_receiver) = EngineFactory::initiator(settings, forgefix::log::FileLoggerFactory)?
82//!         .start_sync()?;
83//!
84//!     std::thread::spawn(move || {
85//!         while let Some(msg) = event_receiver.blocking_recv() {
86//!             println!("got an application message: {}", msg);
87//!         }
88//!     });
89//!
90//!     handle.logon_sync()?;
91//!
92//!     // send messages here...
93//!
94//!     handle.logout_sync()?;
95//!     
96//!     Ok(())
97//! }
98//! ```
99//! *When using synchronous API, a tokio runtime is still created internally (see
100//! [`EngineFactory::start_sync`])
101//!
102//! ## Feature Flags
103//!
104//! - `sqlite`: Enables the sqlite3 database backed store. Allows messages and
105//! sequences numbers to be persisted to disk, so ForgeFIX can pick up in the middle of a session
106//! if restarted. Otherwise, every restart results in a new FIX session.
107
108pub mod fix;
109pub mod log;
110
111use fix::encode::MessageBuilder;
112use fix::mem::MsgBuf;
113
114use std::net::SocketAddr;
115use std::path::PathBuf;
116use std::sync::Arc;
117use std::time::Duration;
118
119use thiserror::Error;
120use tokio::net::{TcpListener, TcpSocket, TcpStream};
121use tokio::sync::{mpsc, oneshot};
122
123use chrono::naive::NaiveTime;
124
125enum Request {
126    Logon {
127        resp_sender: oneshot::Sender<bool>,
128    },
129    SendMessage {
130        resp_sender: oneshot::Sender<bool>,
131        builder: MessageBuilder,
132    },
133    Logout {
134        resp_sender: oneshot::Sender<bool>,
135    },
136}
137
138/// Errors that can occur while running ForgeFIX.
139#[derive(Debug, Error)]
140pub enum ApplicationError {
141    #[error("An I/O error occured: {0}")]
142    IoError(#[from] std::io::Error),
143    #[error("Session ended unexpectedly")]
144    SessionEnded,
145    #[error("Logon has failed")]
146    LogonFailed,
147    #[error("Logout has failed")]
148    LogoutFailed,
149    #[error("MessageSend has failed")]
150    SendMessageFailed,
151    #[error("setting `{0}` is required")]
152    SettingRequired(String),
153}
154
155/// A collection of settings used to configurate a FIX session.
156///
157/// `SessionSettings` can be constructed using the [`SessionSettingsBuilder`], or can be constructed explicitly.
158#[derive(Clone)]
159pub struct SessionSettings {
160    begin_string: Arc<String>,
161    sender_comp_id: String,
162    target_comp_id: String,
163    addr: SocketAddr,
164    epoch: Arc<String>,
165    store_path: PathBuf,
166    log_dir: PathBuf,
167    heartbeat_timeout: Duration,
168    start_time: NaiveTime,
169}
170
171/// A builder for easily configuring all the fields of a [`SessionSettings`]
172///
173/// The following settings are required to be set:
174/// * sender comp id
175/// * target comp id
176/// * addr
177/// * store path
178/// * log dir
179#[derive(Default)]
180pub struct SessionSettingsBuilder {
181    sender_comp_id: Option<String>,
182    target_comp_id: Option<String>,
183    addr: Option<SocketAddr>,
184    begin_string: Option<String>,
185    epoch: Option<String>,
186    store_path: Option<PathBuf>,
187    log_dir: Option<PathBuf>,
188    heartbeat_timeout: Option<Duration>,
189    start_time: Option<NaiveTime>,
190}
191
192impl SessionSettingsBuilder {
193    pub fn new() -> SessionSettingsBuilder {
194        Default::default()
195    }
196
197    /// The time the FIX session starts each day.
198    pub fn with_start_time(mut self, start_time: NaiveTime) -> Self {
199        self.set_start_time(start_time);
200        self
201    }
202    pub fn set_start_time(&mut self, start_time: NaiveTime) {
203        self.start_time = Some(start_time);
204    }
205
206    /// The `SenderCompID(49)` that will be included in each message.
207    pub fn with_sender_comp_id(mut self, sender_comp_id: &str) -> Self {
208        self.set_sender_comp_id(sender_comp_id);
209        self
210    }
211    pub fn set_sender_comp_id(&mut self, sender_comp_id: &str) {
212        self.sender_comp_id = Some(sender_comp_id.to_string());
213    }
214
215    /// The `TargetCompID(56)` that will be included in each message.
216    pub fn with_target_comp_id(mut self, target_comp_id: &str) -> Self {
217        self.set_target_comp_id(target_comp_id);
218        self
219    }
220    pub fn set_target_comp_id(&mut self, target_comp_id: &str) {
221        self.target_comp_id = Some(target_comp_id.to_string());
222    }
223
224    /// The address to initiate a connection to, or accept connections on.
225    pub fn with_socket_addr(mut self, addr: SocketAddr) -> Self {
226        self.addr = Some(addr);
227        self
228    }
229    pub fn set_socket_addr(&mut self, addr: SocketAddr) {
230        self.addr = Some(addr);
231    }
232
233    /// The `BeginString(8)` that will be included in each message.
234    pub fn with_begin_string(mut self, begin_string: &str) -> Self {
235        self.set_begin_string(begin_string);
236        self
237    }
238    pub fn set_begin_string(&mut self, begin_string: &str) {
239        self.begin_string = Some(begin_string.to_string());
240    }
241
242    /// A local unique identifier for this FIX session.
243    pub fn with_epoch(mut self, epoch: &str) -> Self {
244        self.set_epoch(epoch);
245        self
246    }
247    pub fn set_epoch(&mut self, epoch: &str) {
248        self.epoch = Some(epoch.to_string());
249    }
250
251    /// The file that should be used as the sqlite database file.
252    pub fn with_store_path(mut self, store_path: PathBuf) -> Self {
253        self.set_store_path(store_path);
254        self
255    }
256    pub fn set_store_path(&mut self, store_path: PathBuf) {
257        self.store_path = Some(store_path);
258    }
259
260    /// The directory that should be used to create log files.
261    ///
262    /// This field is read by [`log::FileLoggerFactory`] to determine where to write log files.
263    /// Custom [`log::LoggerFactory`] implementations may ignore it.
264    pub fn with_log_dir(mut self, log_dir: PathBuf) -> Self {
265        self.set_log_dir(log_dir);
266        self
267    }
268    pub fn set_log_dir(&mut self, log_dir: PathBuf) {
269        self.log_dir = Some(log_dir);
270    }
271
272    /// The timeout length used for sending `Heartbeat<0>` messages.
273    pub fn with_heartbeat_timeout(mut self, hb_timeout: Duration) -> Self {
274        self.set_heartbeat_timeout(hb_timeout);
275        self
276    }
277    pub fn set_heartbeat_timeout(&mut self, hb_timeout: Duration) {
278        self.heartbeat_timeout = Some(hb_timeout);
279    }
280
281    /// Build the [`SessionSettings`] struct.
282    ///
283    /// Returns an `Err(ApplicationError::SettingRequired)` if not all of the required fields
284    /// were set.
285    pub fn build(self) -> Result<SessionSettings, ApplicationError> {
286        let sender_comp_id = self
287            .sender_comp_id
288            .ok_or(ApplicationError::SettingRequired(
289                "sender_comp_id".to_string(),
290            ))?;
291        let target_comp_id = self
292            .target_comp_id
293            .ok_or(ApplicationError::SettingRequired(
294                "target_comp_id".to_string(),
295            ))?;
296        let addr = self
297            .addr
298            .ok_or(ApplicationError::SettingRequired("addr".to_string()))?;
299        let store_path = self
300            .store_path
301            .ok_or(ApplicationError::SettingRequired("store_path".to_string()))?;
302        let log_dir = self
303            .log_dir
304            .ok_or(ApplicationError::SettingRequired("log_dir".to_string()))?;
305
306        Ok(SessionSettings {
307            begin_string: Arc::new(self.begin_string.unwrap_or(String::from("FIX.4.2"))),
308            epoch: Arc::new(
309                self.epoch
310                    .unwrap_or(format!("{}_{}", &sender_comp_id, &target_comp_id)),
311            ),
312            heartbeat_timeout: self.heartbeat_timeout.unwrap_or(Duration::from_secs(30)),
313            start_time: self.start_time.unwrap_or_default(),
314            sender_comp_id,
315            target_comp_id,
316            addr,
317            store_path,
318            log_dir,
319        })
320    }
321}
322
323impl SessionSettings {
324    /// Creates a new [`SessionSettingsBuilder`]
325    pub fn builder() -> SessionSettingsBuilder {
326        SessionSettingsBuilder::new()
327    }
328
329    fn expected_sender_comp_id(&self) -> &str {
330        &self.target_comp_id
331    }
332
333    fn expected_target_comp_id(&self) -> &str {
334        &self.sender_comp_id
335    }
336}
337
338/// A handle on a FIX engine instance.
339///
340/// The [`EngineHandle`] allows for requesting the basic operations of starting the FIX connection, sending
341/// a message to the peer, and ending the connection.
342///
343/// The handle offers asynchronous and synchronous APIs for these operations. As well as functions
344/// that return immedietly with a [`oneshot::Receiver`] that will eventually return the result of the
345/// operation.
346///
347/// The underlying engine could stop running at any moment for a variety of reasons. Only until you
348/// attempt an operation, will you learn the engine has stopped by receiving an
349/// [`ApplicationError::SessionEnded`].
350///
351/// [`EngineHandle`] `impl`'s [`Clone`], [`Send`] and [`Sync`] and therefore multiple
352/// copies of the handle can be made and passed to different threads that can all request messages
353/// to be sent. Only one thread has to call [`end`] for the engine to terminate the connection.
354///
355/// [`oneshot::Receiver`]: https://docs.rs/tokio/latest/tokio/sync/oneshot/struct.Receiver.html
356/// [`end`]: EngineHandle::end
357///
358/// # Example - Multiple Threads
359///
360///```no_run
361/// use forgefix::{
362///     SessionSettings, EngineFactory, ApplicationError
363/// };
364/// use forgefix::fix::{encode::MessageBuilder, fields::MsgType};
365/// # use anyhow::Result;
366/// # #[tokio::main]
367/// # async fn main() -> Result<()> {
368/// #    let settings = SessionSettings::builder()
369/// #        .with_sender_comp_id("my_id")
370/// #        .with_target_comp_id("peer_id")
371/// #        .with_store_path("./store".into())
372/// #        .with_log_dir("./log".into())
373/// #        .with_socket_addr("127.0.0.1:0".parse().unwrap())
374/// #        .build()?;
375///
376/// let (handle, mut receiver) = EngineFactory::initiator(settings, forgefix::log::FileLoggerFactory)?
377///     .start()
378///     .await?;
379/// receiver.close();
380///
381/// // logon to the session
382/// handle.logon_async().await;
383///
384/// // EngineHandle can be cloned
385/// let handle1 = handle.clone();
386/// let handle2 = handle.clone();
387///
388/// // EngineHandle clones can be sent across threads and tasks
389/// let h1 = tokio::spawn(async move {
390///
391///     // thread logic here...
392///
393///     let builder = MessageBuilder::new(
394///         &handle1.begin_string(),
395///         MsgType::ORDER_SINGLE
396///     );
397///     handle1.send_message_async(builder).await
398///
399///     // ...
400/// });
401///
402/// // send to multiple tasks...
403/// let h2 = tokio::spawn(async move {
404///     let builder = MessageBuilder::new(
405///         &handle2.begin_string(),
406///         MsgType::ORDER_SINGLE
407///     );
408///     handle2.send_message_async(builder).await
409/// });
410///
411/// // wait for all threads to finish...
412/// let (res1, res2) = tokio::join!(h1, h2);
413/// res1??;
414/// res2??;
415///     
416/// // logout from the session
417/// handle.logout_async().await?;
418///  #   Ok(())
419/// # }
420///
421///```
422#[derive(Clone)]
423pub struct EngineHandle {
424    request_sender: mpsc::UnboundedSender<Request>,
425    begin_string: Arc<String>,
426}
427
428impl EngineHandle {
429    /// Send a request to the engine to logon to the session and return immediately.
430    ///
431    /// The receiver will eventually yield `true` if the session was successfully logged into, or
432    /// `false` othersize.
433    pub fn logon(&self) -> Result<oneshot::Receiver<bool>, ApplicationError> {
434        if self.request_sender.is_closed() {
435            return Err(ApplicationError::SessionEnded);
436        }
437        let (resp_sender, resp_receiver) = oneshot::channel();
438        let logon_request = Request::Logon { resp_sender };
439        let _ = self.request_sender.send(logon_request);
440        Ok(resp_receiver)
441    }
442    /// Send a request to the engine to logon to the session await asynchronously.
443    pub async fn logon_async(&self) -> Result<(), ApplicationError> {
444        let resp_sender = self.logon()?;
445        if Ok(true) != resp_sender.await {
446            return Err(ApplicationError::LogonFailed);
447        }
448        Ok(())
449    }
450    /// Send a request to the engine to logon to the session, and block until a result is returned.
451    pub fn logon_sync(&self) -> Result<(), ApplicationError> {
452        let resp_receiver = self.logon()?;
453        if Ok(true) != resp_receiver.blocking_recv() {
454            return Err(ApplicationError::LogonFailed);
455        }
456        Ok(())
457    }
458
459    /// Send a request to the engine to send the message in the [`MessageBuilder`] to the peer, and return immediately.
460    ///
461    /// If the request was successfully sent to the engine, a [`oneshot::Receiver`] will be
462    /// returned.
463    ///
464    /// The receiver will yield `true` once the message has successfully sent over the TCP
465    /// connection. It will yeild `false` if a message cannot be sent.
466    ///
467    /// [`oneshot::Receiver`]: https://docs.rs/tokio/latest/tokio/sync/oneshot/struct.Receiver.html
468    pub fn send_message(
469        &self,
470        builder: MessageBuilder,
471    ) -> Result<oneshot::Receiver<bool>, ApplicationError> {
472        if self.request_sender.is_closed() {
473            return Err(ApplicationError::SessionEnded);
474        }
475        let (resp_sender, resp_receiver) = oneshot::channel();
476        let send_message_request = Request::SendMessage {
477            resp_sender,
478            builder,
479        };
480        let _ = self.request_sender.send(send_message_request);
481        Ok(resp_receiver)
482    }
483    /// Send a request to the engine to send the message in `builder` and await asynchronously.
484    pub async fn send_message_async(
485        &self,
486        builder: MessageBuilder,
487    ) -> Result<(), ApplicationError> {
488        let resp_sender = self.send_message(builder)?;
489        if Ok(true) != resp_sender.await {
490            return Err(ApplicationError::SendMessageFailed);
491        }
492        Ok(())
493    }
494    /// Send a request to the engine to send the message in `builder` and block until a result is
495    /// returned.
496    pub fn send_message_sync(&self, builder: MessageBuilder) -> Result<(), ApplicationError> {
497        let resp_receiver = self.send_message(builder)?;
498        if Ok(true) != resp_receiver.blocking_recv() {
499            return Err(ApplicationError::SendMessageFailed);
500        }
501        Ok(())
502    }
503
504    /// Send a request to the engine to logout from the session, and return immediately.
505    ///
506    /// If the request was successfully sent to the engine, a [`oneshot::Receiver`] will be
507    /// returned.
508    ///
509    /// The receiver will yield `true` is the session was logged out from ended without any issues.
510    /// Otherwise it will be `false`.
511    ///
512    /// [`oneshot::Receiver`]: https://docs.rs/tokio/latest/tokio/sync/oneshot/struct.Receiver.html
513    pub fn logout(&self) -> Result<oneshot::Receiver<bool>, ApplicationError> {
514        let (resp_sender, resp_receiver) = oneshot::channel();
515        let logout_request = Request::Logout { resp_sender };
516        let _ = self.request_sender.send(logout_request);
517        Ok(resp_receiver)
518    }
519    /// Send a request to the engine to logout from the session, and await asynchronously.
520    pub async fn logout_async(&self) -> Result<(), ApplicationError> {
521        let resp_sender = self.logout()?;
522        if Ok(true) != resp_sender.await {
523            return Err(ApplicationError::LogoutFailed);
524        }
525        Ok(())
526    }
527    /// Send a request to the engine to logout from the session, and block until a result is
528    /// returned.
529    pub fn logout_sync(&self) -> Result<(), ApplicationError> {
530        let resp_receiver = self.logout()?;
531        if Ok(true) != resp_receiver.blocking_recv() {
532            return Err(ApplicationError::LogoutFailed);
533        }
534        Ok(())
535    }
536
537    /// Get the `BeginString(8)` of this FIX Session. Should generally be `"FIX.4.2"`.
538    pub fn begin_string(&self) -> Arc<String> {
539        Arc::clone(&self.begin_string)
540    }
541}
542
543#[derive(Copy, Clone)]
544enum EngineKind {
545    Acceptor,
546    Initiator,
547}
548
549enum StreamFactory {
550    Server(TcpListener),
551    Client(std::net::SocketAddr),
552}
553
554impl StreamFactory {
555    fn build(settings: &SessionSettings, typ: EngineKind) -> Result<Self, std::io::Error> {
556        match typ {
557            EngineKind::Initiator => Ok(StreamFactory::Client(settings.addr)),
558            EngineKind::Acceptor => {
559                let socket = TcpSocket::new_v4()?;
560                socket.bind(settings.addr)?;
561                let listener = socket.listen(1024)?;
562                Ok(StreamFactory::Server(listener))
563            }
564        }
565    }
566
567    async fn stream(&self) -> Result<TcpStream, std::io::Error> {
568        match self {
569            StreamFactory::Server(listener) => {
570                let (stream, _from_addr) = listener.accept().await?;
571                Ok(stream)
572            }
573            StreamFactory::Client(addr) => {
574                let socket = TcpSocket::new_v4()?;
575                Ok(socket.connect(*addr).await?)
576            }
577        }
578    }
579}
580
581/// A struct that can establish a TCP connections to the peer and create FIX engine instances.
582///
583/// FIX engines come in two flavors: [initiator](EngineFactory::initiator) and [acceptor](EngineFactory::acceptor).
584/// An initiator creates the TCP connection and transmits the first `Logon<35=A>` message. An acceptor listens for incoming TCP
585/// connections and waits for the first `Logon<35=A>` message. See ([FIX spec]) for more details.
586///
587/// [FIX spec]: https://www.fixtrading.org/standards/fix-session-layer-online/
588pub struct EngineFactory<LF> {
589    typ: EngineKind,
590    settings: SessionSettings,
591    stream_factory: StreamFactory,
592    logger_factory: LF,
593}
594
595impl<LF: log::LoggerFactory> EngineFactory<LF> {
596    /// Build an `EngineFactory` that creates an acceptor FIX engine using settings
597    pub fn acceptor(
598        settings: SessionSettings,
599        logger_factory: LF,
600    ) -> Result<Self, ApplicationError> {
601        Self::build(settings, EngineKind::Acceptor, logger_factory)
602    }
603
604    /// Build an `EngineFactory` that creates an initiator FIX engine using settings
605    pub fn initiator(
606        settings: SessionSettings,
607        logger_factory: LF,
608    ) -> Result<Self, ApplicationError> {
609        Self::build(settings, EngineKind::Initiator, logger_factory)
610    }
611
612    fn build(
613        settings: SessionSettings,
614        typ: EngineKind,
615        logger_factory: LF,
616    ) -> Result<Self, ApplicationError> {
617        let stream_factory = StreamFactory::build(&settings, typ)?;
618        Ok(Self {
619            settings,
620            stream_factory,
621            typ,
622            logger_factory,
623        })
624    }
625
626    /// Establish a TCP connection and start the FIX engine with the current asynchronous runtime.
627    ///
628    /// If the connection is successfully established, an [`EngineHandle`] will be returned, and an
629    /// `UnboundedReceiver<Arc<MsgBuf>>` will be returned.
630    ///
631    /// The application handle can be used to logon to, send messages over, and logout from the FIX
632    /// connection.
633    ///
634    /// The receiver is a channel where all incoming, valid application messages can be received.
635    /// If you do not want to use the channel, it is recommended you call [`close`].
636    ///
637    /// [`close`]: tokio::sync::mpsc::UnboundedReceiver::close
638    pub async fn start(
639        &mut self,
640    ) -> Result<(EngineHandle, mpsc::UnboundedReceiver<Arc<MsgBuf>>), ApplicationError> {
641        let stream = self.stream_factory.stream().await?;
642        let settings = self.settings.clone();
643        let logger = self.logger_factory.build(&settings)?;
644
645        let (request_sender, request_receiver) = mpsc::unbounded_channel::<Request>();
646        let (app_message_event_sender, app_message_event_receiver) =
647            mpsc::unbounded_channel::<Arc<MsgBuf>>();
648
649        let begin_string = Arc::clone(&self.settings.begin_string);
650        let typ = self.typ;
651
652        tokio::task::spawn(async move {
653            if let Err(e) = fix::spin_session(
654                stream,
655                request_receiver,
656                app_message_event_sender,
657                settings,
658                typ,
659                logger,
660            )
661            .await
662            {
663                eprintln!("{e:?}");
664            }
665        });
666
667        let handle = EngineHandle {
668            request_sender,
669            begin_string,
670        };
671
672        Ok((handle, app_message_event_receiver))
673    }
674
675    /// Establish a TCP connection and start the FIX engine that will be driven by `runtime`.
676    pub fn start_with_runtime(
677        &mut self,
678        runtime: tokio::runtime::Runtime,
679    ) -> Result<(EngineHandle, mpsc::UnboundedReceiver<Arc<MsgBuf>>), ApplicationError> {
680        let stream = runtime.block_on(self.stream_factory.stream())?;
681        let settings = self.settings.clone();
682        let logger = runtime.block_on(async { self.logger_factory.build(&settings) })?;
683
684        let (request_sender, request_receiver) = mpsc::unbounded_channel::<Request>();
685        let (app_message_event_sender, app_message_event_receiver) =
686            mpsc::unbounded_channel::<Arc<MsgBuf>>();
687
688        let begin_string = Arc::clone(&self.settings.begin_string);
689        let typ = self.typ;
690
691        std::thread::spawn(move || {
692            if let Err(e) = runtime.block_on(fix::spin_session(
693                stream,
694                request_receiver,
695                app_message_event_sender,
696                settings,
697                typ,
698                logger,
699            )) {
700                eprintln!("{e:?}");
701            }
702        });
703
704        let handle = EngineHandle {
705            request_sender,
706            begin_string,
707        };
708
709        Ok((handle, app_message_event_receiver))
710    }
711
712    /// Establish a TCP connection, and a runtime will be created internally to drive the engine.
713    pub fn start_sync(
714        &mut self,
715    ) -> Result<(EngineHandle, mpsc::UnboundedReceiver<Arc<MsgBuf>>), ApplicationError> {
716        let runtime = tokio::runtime::Builder::new_multi_thread()
717            .enable_all()
718            .build()?;
719        self.start_with_runtime(runtime)
720    }
721}