forgefix/lib.rs
1//! An opinionated FIX 4.2 client library for the buy-side.
2//!
3//! ForgeFIX is an engine that implements a subset of the FIX protocol which allows users to connect
4//! to brokers or exchanges to send and receive messages.
5//!
6//! ## Terminology
7//! * `FIX Connection` -- A single connection to a FIX Session. A network connection is made over TCP,
8//! then a FIX logon handshake is performed to establish the FIX connection. The FIX connection
9//! ends properly with a FIX logout, but is considered ended if the TCP connection breaks.
10//! * Note, the term 'connection' is overloaded and can also mean TCP connection. When unclear, a
11//! 'connection' will be specified as TCP or FIX.
12//!
13//! * `FIX Session` -- A conceptual construct that represents the bidirectional stream of ordered
14//! messages between two peers. A FIX Session can live across multiple instances of a FIX
15//! connection.
16//!
17//! * `FIX Engine` -- A sub-process running in the background that manages a single FIX connection
18//! to a FIX Session. The engine starts, runs, and ends the FIX connection as defined by the FIX
19//! protocol, and manages all resources that support the connection.
20//!
21//! ## Examples
22//!
23//! ### Asynchronous API
24//! ```no_run
25//! use forgefix::{
26//! ApplicationError, SessionSettings, EngineFactory, EngineHandle,
27//! };
28//!
29//! #[tokio::main]
30//! async fn main() -> Result<(), ApplicationError> {
31//!
32//! // build session settings
33//! let settings = SessionSettings::builder()
34//! .with_sender_comp_id("my_id")
35//! .with_target_comp_id("peer_id")
36//! .with_store_path("./store".into())
37//! .with_log_dir("./log".into())
38//! .with_socket_addr("127.0.0.1:0".parse().unwrap())
39//! .build()?;
40//!
41//! // create a FIX engine and intiate TCP connection
42//! let (handle, mut event_receiver) = EngineFactory::initiator(settings, forgefix::log::FileLoggerFactory)?
43//! .start()
44//! .await?;
45//!
46//! // handle incoming messages in the background...
47//! tokio::spawn(async move {
48//! while let Some(msg) = event_receiver.recv().await {
49//! println!("got an application message: {}", msg);
50//! }
51//! });
52//!
53//! // logon to the FIX session
54//! handle.logon_async().await?;
55//!
56//! // send messages here...
57//!
58//! // logout from the FIX session
59//! handle.logout_async().await?;
60//!
61//! Ok(())
62//! }
63//! ```
64//!
65//! ### Synchronous API*
66//! ```no_run
67//! use forgefix::{
68//! ApplicationError, SessionSettings, EngineFactory, EngineHandle,
69//! };
70//!
71//! fn main() -> Result<(), ApplicationError> {
72//!
73//! let settings = SessionSettings::builder()
74//! .with_sender_comp_id("my_id")
75//! .with_target_comp_id("peer_id")
76//! .with_store_path("./store".into())
77//! .with_log_dir("./log".into())
78//! .with_socket_addr("127.0.0.1:0".parse().unwrap())
79//! .build()?;
80//!
81//! let (handle, mut event_receiver) = EngineFactory::initiator(settings, forgefix::log::FileLoggerFactory)?
82//! .start_sync()?;
83//!
84//! std::thread::spawn(move || {
85//! while let Some(msg) = event_receiver.blocking_recv() {
86//! println!("got an application message: {}", msg);
87//! }
88//! });
89//!
90//! handle.logon_sync()?;
91//!
92//! // send messages here...
93//!
94//! handle.logout_sync()?;
95//!
96//! Ok(())
97//! }
98//! ```
99//! *When using synchronous API, a tokio runtime is still created internally (see
100//! [`EngineFactory::start_sync`])
101//!
102//! ## Feature Flags
103//!
104//! - `sqlite`: Enables the sqlite3 database backed store. Allows messages and
105//! sequences numbers to be persisted to disk, so ForgeFIX can pick up in the middle of a session
106//! if restarted. Otherwise, every restart results in a new FIX session.
107
108pub mod fix;
109pub mod log;
110
111use fix::encode::MessageBuilder;
112use fix::mem::MsgBuf;
113
114use std::net::SocketAddr;
115use std::path::PathBuf;
116use std::sync::Arc;
117use std::time::Duration;
118
119use thiserror::Error;
120use tokio::net::{TcpListener, TcpSocket, TcpStream};
121use tokio::sync::{mpsc, oneshot};
122
123use chrono::naive::NaiveTime;
124
125enum Request {
126 Logon {
127 resp_sender: oneshot::Sender<bool>,
128 },
129 SendMessage {
130 resp_sender: oneshot::Sender<bool>,
131 builder: MessageBuilder,
132 },
133 Logout {
134 resp_sender: oneshot::Sender<bool>,
135 },
136}
137
138/// Errors that can occur while running ForgeFIX.
139#[derive(Debug, Error)]
140pub enum ApplicationError {
141 #[error("An I/O error occured: {0}")]
142 IoError(#[from] std::io::Error),
143 #[error("Session ended unexpectedly")]
144 SessionEnded,
145 #[error("Logon has failed")]
146 LogonFailed,
147 #[error("Logout has failed")]
148 LogoutFailed,
149 #[error("MessageSend has failed")]
150 SendMessageFailed,
151 #[error("setting `{0}` is required")]
152 SettingRequired(String),
153}
154
155/// A collection of settings used to configurate a FIX session.
156///
157/// `SessionSettings` can be constructed using the [`SessionSettingsBuilder`], or can be constructed explicitly.
158#[derive(Clone)]
159pub struct SessionSettings {
160 begin_string: Arc<String>,
161 sender_comp_id: String,
162 target_comp_id: String,
163 addr: SocketAddr,
164 epoch: Arc<String>,
165 store_path: PathBuf,
166 log_dir: PathBuf,
167 heartbeat_timeout: Duration,
168 start_time: NaiveTime,
169}
170
171/// A builder for easily configuring all the fields of a [`SessionSettings`]
172///
173/// The following settings are required to be set:
174/// * sender comp id
175/// * target comp id
176/// * addr
177/// * store path
178/// * log dir
179#[derive(Default)]
180pub struct SessionSettingsBuilder {
181 sender_comp_id: Option<String>,
182 target_comp_id: Option<String>,
183 addr: Option<SocketAddr>,
184 begin_string: Option<String>,
185 epoch: Option<String>,
186 store_path: Option<PathBuf>,
187 log_dir: Option<PathBuf>,
188 heartbeat_timeout: Option<Duration>,
189 start_time: Option<NaiveTime>,
190}
191
192impl SessionSettingsBuilder {
193 pub fn new() -> SessionSettingsBuilder {
194 Default::default()
195 }
196
197 /// The time the FIX session starts each day.
198 pub fn with_start_time(mut self, start_time: NaiveTime) -> Self {
199 self.set_start_time(start_time);
200 self
201 }
202 pub fn set_start_time(&mut self, start_time: NaiveTime) {
203 self.start_time = Some(start_time);
204 }
205
206 /// The `SenderCompID(49)` that will be included in each message.
207 pub fn with_sender_comp_id(mut self, sender_comp_id: &str) -> Self {
208 self.set_sender_comp_id(sender_comp_id);
209 self
210 }
211 pub fn set_sender_comp_id(&mut self, sender_comp_id: &str) {
212 self.sender_comp_id = Some(sender_comp_id.to_string());
213 }
214
215 /// The `TargetCompID(56)` that will be included in each message.
216 pub fn with_target_comp_id(mut self, target_comp_id: &str) -> Self {
217 self.set_target_comp_id(target_comp_id);
218 self
219 }
220 pub fn set_target_comp_id(&mut self, target_comp_id: &str) {
221 self.target_comp_id = Some(target_comp_id.to_string());
222 }
223
224 /// The address to initiate a connection to, or accept connections on.
225 pub fn with_socket_addr(mut self, addr: SocketAddr) -> Self {
226 self.addr = Some(addr);
227 self
228 }
229 pub fn set_socket_addr(&mut self, addr: SocketAddr) {
230 self.addr = Some(addr);
231 }
232
233 /// The `BeginString(8)` that will be included in each message.
234 pub fn with_begin_string(mut self, begin_string: &str) -> Self {
235 self.set_begin_string(begin_string);
236 self
237 }
238 pub fn set_begin_string(&mut self, begin_string: &str) {
239 self.begin_string = Some(begin_string.to_string());
240 }
241
242 /// A local unique identifier for this FIX session.
243 pub fn with_epoch(mut self, epoch: &str) -> Self {
244 self.set_epoch(epoch);
245 self
246 }
247 pub fn set_epoch(&mut self, epoch: &str) {
248 self.epoch = Some(epoch.to_string());
249 }
250
251 /// The file that should be used as the sqlite database file.
252 pub fn with_store_path(mut self, store_path: PathBuf) -> Self {
253 self.set_store_path(store_path);
254 self
255 }
256 pub fn set_store_path(&mut self, store_path: PathBuf) {
257 self.store_path = Some(store_path);
258 }
259
260 /// The directory that should be used to create log files.
261 ///
262 /// This field is read by [`log::FileLoggerFactory`] to determine where to write log files.
263 /// Custom [`log::LoggerFactory`] implementations may ignore it.
264 pub fn with_log_dir(mut self, log_dir: PathBuf) -> Self {
265 self.set_log_dir(log_dir);
266 self
267 }
268 pub fn set_log_dir(&mut self, log_dir: PathBuf) {
269 self.log_dir = Some(log_dir);
270 }
271
272 /// The timeout length used for sending `Heartbeat<0>` messages.
273 pub fn with_heartbeat_timeout(mut self, hb_timeout: Duration) -> Self {
274 self.set_heartbeat_timeout(hb_timeout);
275 self
276 }
277 pub fn set_heartbeat_timeout(&mut self, hb_timeout: Duration) {
278 self.heartbeat_timeout = Some(hb_timeout);
279 }
280
281 /// Build the [`SessionSettings`] struct.
282 ///
283 /// Returns an `Err(ApplicationError::SettingRequired)` if not all of the required fields
284 /// were set.
285 pub fn build(self) -> Result<SessionSettings, ApplicationError> {
286 let sender_comp_id = self
287 .sender_comp_id
288 .ok_or(ApplicationError::SettingRequired(
289 "sender_comp_id".to_string(),
290 ))?;
291 let target_comp_id = self
292 .target_comp_id
293 .ok_or(ApplicationError::SettingRequired(
294 "target_comp_id".to_string(),
295 ))?;
296 let addr = self
297 .addr
298 .ok_or(ApplicationError::SettingRequired("addr".to_string()))?;
299 let store_path = self
300 .store_path
301 .ok_or(ApplicationError::SettingRequired("store_path".to_string()))?;
302 let log_dir = self
303 .log_dir
304 .ok_or(ApplicationError::SettingRequired("log_dir".to_string()))?;
305
306 Ok(SessionSettings {
307 begin_string: Arc::new(self.begin_string.unwrap_or(String::from("FIX.4.2"))),
308 epoch: Arc::new(
309 self.epoch
310 .unwrap_or(format!("{}_{}", &sender_comp_id, &target_comp_id)),
311 ),
312 heartbeat_timeout: self.heartbeat_timeout.unwrap_or(Duration::from_secs(30)),
313 start_time: self.start_time.unwrap_or_default(),
314 sender_comp_id,
315 target_comp_id,
316 addr,
317 store_path,
318 log_dir,
319 })
320 }
321}
322
323impl SessionSettings {
324 /// Creates a new [`SessionSettingsBuilder`]
325 pub fn builder() -> SessionSettingsBuilder {
326 SessionSettingsBuilder::new()
327 }
328
329 fn expected_sender_comp_id(&self) -> &str {
330 &self.target_comp_id
331 }
332
333 fn expected_target_comp_id(&self) -> &str {
334 &self.sender_comp_id
335 }
336}
337
338/// A handle on a FIX engine instance.
339///
340/// The [`EngineHandle`] allows for requesting the basic operations of starting the FIX connection, sending
341/// a message to the peer, and ending the connection.
342///
343/// The handle offers asynchronous and synchronous APIs for these operations. As well as functions
344/// that return immedietly with a [`oneshot::Receiver`] that will eventually return the result of the
345/// operation.
346///
347/// The underlying engine could stop running at any moment for a variety of reasons. Only until you
348/// attempt an operation, will you learn the engine has stopped by receiving an
349/// [`ApplicationError::SessionEnded`].
350///
351/// [`EngineHandle`] `impl`'s [`Clone`], [`Send`] and [`Sync`] and therefore multiple
352/// copies of the handle can be made and passed to different threads that can all request messages
353/// to be sent. Only one thread has to call [`end`] for the engine to terminate the connection.
354///
355/// [`oneshot::Receiver`]: https://docs.rs/tokio/latest/tokio/sync/oneshot/struct.Receiver.html
356/// [`end`]: EngineHandle::end
357///
358/// # Example - Multiple Threads
359///
360///```no_run
361/// use forgefix::{
362/// SessionSettings, EngineFactory, ApplicationError
363/// };
364/// use forgefix::fix::{encode::MessageBuilder, fields::MsgType};
365/// # use anyhow::Result;
366/// # #[tokio::main]
367/// # async fn main() -> Result<()> {
368/// # let settings = SessionSettings::builder()
369/// # .with_sender_comp_id("my_id")
370/// # .with_target_comp_id("peer_id")
371/// # .with_store_path("./store".into())
372/// # .with_log_dir("./log".into())
373/// # .with_socket_addr("127.0.0.1:0".parse().unwrap())
374/// # .build()?;
375///
376/// let (handle, mut receiver) = EngineFactory::initiator(settings, forgefix::log::FileLoggerFactory)?
377/// .start()
378/// .await?;
379/// receiver.close();
380///
381/// // logon to the session
382/// handle.logon_async().await;
383///
384/// // EngineHandle can be cloned
385/// let handle1 = handle.clone();
386/// let handle2 = handle.clone();
387///
388/// // EngineHandle clones can be sent across threads and tasks
389/// let h1 = tokio::spawn(async move {
390///
391/// // thread logic here...
392///
393/// let builder = MessageBuilder::new(
394/// &handle1.begin_string(),
395/// MsgType::ORDER_SINGLE
396/// );
397/// handle1.send_message_async(builder).await
398///
399/// // ...
400/// });
401///
402/// // send to multiple tasks...
403/// let h2 = tokio::spawn(async move {
404/// let builder = MessageBuilder::new(
405/// &handle2.begin_string(),
406/// MsgType::ORDER_SINGLE
407/// );
408/// handle2.send_message_async(builder).await
409/// });
410///
411/// // wait for all threads to finish...
412/// let (res1, res2) = tokio::join!(h1, h2);
413/// res1??;
414/// res2??;
415///
416/// // logout from the session
417/// handle.logout_async().await?;
418/// # Ok(())
419/// # }
420///
421///```
422#[derive(Clone)]
423pub struct EngineHandle {
424 request_sender: mpsc::UnboundedSender<Request>,
425 begin_string: Arc<String>,
426}
427
428impl EngineHandle {
429 /// Send a request to the engine to logon to the session and return immediately.
430 ///
431 /// The receiver will eventually yield `true` if the session was successfully logged into, or
432 /// `false` othersize.
433 pub fn logon(&self) -> Result<oneshot::Receiver<bool>, ApplicationError> {
434 if self.request_sender.is_closed() {
435 return Err(ApplicationError::SessionEnded);
436 }
437 let (resp_sender, resp_receiver) = oneshot::channel();
438 let logon_request = Request::Logon { resp_sender };
439 let _ = self.request_sender.send(logon_request);
440 Ok(resp_receiver)
441 }
442 /// Send a request to the engine to logon to the session await asynchronously.
443 pub async fn logon_async(&self) -> Result<(), ApplicationError> {
444 let resp_sender = self.logon()?;
445 if Ok(true) != resp_sender.await {
446 return Err(ApplicationError::LogonFailed);
447 }
448 Ok(())
449 }
450 /// Send a request to the engine to logon to the session, and block until a result is returned.
451 pub fn logon_sync(&self) -> Result<(), ApplicationError> {
452 let resp_receiver = self.logon()?;
453 if Ok(true) != resp_receiver.blocking_recv() {
454 return Err(ApplicationError::LogonFailed);
455 }
456 Ok(())
457 }
458
459 /// Send a request to the engine to send the message in the [`MessageBuilder`] to the peer, and return immediately.
460 ///
461 /// If the request was successfully sent to the engine, a [`oneshot::Receiver`] will be
462 /// returned.
463 ///
464 /// The receiver will yield `true` once the message has successfully sent over the TCP
465 /// connection. It will yeild `false` if a message cannot be sent.
466 ///
467 /// [`oneshot::Receiver`]: https://docs.rs/tokio/latest/tokio/sync/oneshot/struct.Receiver.html
468 pub fn send_message(
469 &self,
470 builder: MessageBuilder,
471 ) -> Result<oneshot::Receiver<bool>, ApplicationError> {
472 if self.request_sender.is_closed() {
473 return Err(ApplicationError::SessionEnded);
474 }
475 let (resp_sender, resp_receiver) = oneshot::channel();
476 let send_message_request = Request::SendMessage {
477 resp_sender,
478 builder,
479 };
480 let _ = self.request_sender.send(send_message_request);
481 Ok(resp_receiver)
482 }
483 /// Send a request to the engine to send the message in `builder` and await asynchronously.
484 pub async fn send_message_async(
485 &self,
486 builder: MessageBuilder,
487 ) -> Result<(), ApplicationError> {
488 let resp_sender = self.send_message(builder)?;
489 if Ok(true) != resp_sender.await {
490 return Err(ApplicationError::SendMessageFailed);
491 }
492 Ok(())
493 }
494 /// Send a request to the engine to send the message in `builder` and block until a result is
495 /// returned.
496 pub fn send_message_sync(&self, builder: MessageBuilder) -> Result<(), ApplicationError> {
497 let resp_receiver = self.send_message(builder)?;
498 if Ok(true) != resp_receiver.blocking_recv() {
499 return Err(ApplicationError::SendMessageFailed);
500 }
501 Ok(())
502 }
503
504 /// Send a request to the engine to logout from the session, and return immediately.
505 ///
506 /// If the request was successfully sent to the engine, a [`oneshot::Receiver`] will be
507 /// returned.
508 ///
509 /// The receiver will yield `true` is the session was logged out from ended without any issues.
510 /// Otherwise it will be `false`.
511 ///
512 /// [`oneshot::Receiver`]: https://docs.rs/tokio/latest/tokio/sync/oneshot/struct.Receiver.html
513 pub fn logout(&self) -> Result<oneshot::Receiver<bool>, ApplicationError> {
514 let (resp_sender, resp_receiver) = oneshot::channel();
515 let logout_request = Request::Logout { resp_sender };
516 let _ = self.request_sender.send(logout_request);
517 Ok(resp_receiver)
518 }
519 /// Send a request to the engine to logout from the session, and await asynchronously.
520 pub async fn logout_async(&self) -> Result<(), ApplicationError> {
521 let resp_sender = self.logout()?;
522 if Ok(true) != resp_sender.await {
523 return Err(ApplicationError::LogoutFailed);
524 }
525 Ok(())
526 }
527 /// Send a request to the engine to logout from the session, and block until a result is
528 /// returned.
529 pub fn logout_sync(&self) -> Result<(), ApplicationError> {
530 let resp_receiver = self.logout()?;
531 if Ok(true) != resp_receiver.blocking_recv() {
532 return Err(ApplicationError::LogoutFailed);
533 }
534 Ok(())
535 }
536
537 /// Get the `BeginString(8)` of this FIX Session. Should generally be `"FIX.4.2"`.
538 pub fn begin_string(&self) -> Arc<String> {
539 Arc::clone(&self.begin_string)
540 }
541}
542
543#[derive(Copy, Clone)]
544enum EngineKind {
545 Acceptor,
546 Initiator,
547}
548
549enum StreamFactory {
550 Server(TcpListener),
551 Client(std::net::SocketAddr),
552}
553
554impl StreamFactory {
555 fn build(settings: &SessionSettings, typ: EngineKind) -> Result<Self, std::io::Error> {
556 match typ {
557 EngineKind::Initiator => Ok(StreamFactory::Client(settings.addr)),
558 EngineKind::Acceptor => {
559 let socket = TcpSocket::new_v4()?;
560 socket.bind(settings.addr)?;
561 let listener = socket.listen(1024)?;
562 Ok(StreamFactory::Server(listener))
563 }
564 }
565 }
566
567 async fn stream(&self) -> Result<TcpStream, std::io::Error> {
568 match self {
569 StreamFactory::Server(listener) => {
570 let (stream, _from_addr) = listener.accept().await?;
571 Ok(stream)
572 }
573 StreamFactory::Client(addr) => {
574 let socket = TcpSocket::new_v4()?;
575 Ok(socket.connect(*addr).await?)
576 }
577 }
578 }
579}
580
581/// A struct that can establish a TCP connections to the peer and create FIX engine instances.
582///
583/// FIX engines come in two flavors: [initiator](EngineFactory::initiator) and [acceptor](EngineFactory::acceptor).
584/// An initiator creates the TCP connection and transmits the first `Logon<35=A>` message. An acceptor listens for incoming TCP
585/// connections and waits for the first `Logon<35=A>` message. See ([FIX spec]) for more details.
586///
587/// [FIX spec]: https://www.fixtrading.org/standards/fix-session-layer-online/
588pub struct EngineFactory<LF> {
589 typ: EngineKind,
590 settings: SessionSettings,
591 stream_factory: StreamFactory,
592 logger_factory: LF,
593}
594
595impl<LF: log::LoggerFactory> EngineFactory<LF> {
596 /// Build an `EngineFactory` that creates an acceptor FIX engine using settings
597 pub fn acceptor(
598 settings: SessionSettings,
599 logger_factory: LF,
600 ) -> Result<Self, ApplicationError> {
601 Self::build(settings, EngineKind::Acceptor, logger_factory)
602 }
603
604 /// Build an `EngineFactory` that creates an initiator FIX engine using settings
605 pub fn initiator(
606 settings: SessionSettings,
607 logger_factory: LF,
608 ) -> Result<Self, ApplicationError> {
609 Self::build(settings, EngineKind::Initiator, logger_factory)
610 }
611
612 fn build(
613 settings: SessionSettings,
614 typ: EngineKind,
615 logger_factory: LF,
616 ) -> Result<Self, ApplicationError> {
617 let stream_factory = StreamFactory::build(&settings, typ)?;
618 Ok(Self {
619 settings,
620 stream_factory,
621 typ,
622 logger_factory,
623 })
624 }
625
626 /// Establish a TCP connection and start the FIX engine with the current asynchronous runtime.
627 ///
628 /// If the connection is successfully established, an [`EngineHandle`] will be returned, and an
629 /// `UnboundedReceiver<Arc<MsgBuf>>` will be returned.
630 ///
631 /// The application handle can be used to logon to, send messages over, and logout from the FIX
632 /// connection.
633 ///
634 /// The receiver is a channel where all incoming, valid application messages can be received.
635 /// If you do not want to use the channel, it is recommended you call [`close`].
636 ///
637 /// [`close`]: tokio::sync::mpsc::UnboundedReceiver::close
638 pub async fn start(
639 &mut self,
640 ) -> Result<(EngineHandle, mpsc::UnboundedReceiver<Arc<MsgBuf>>), ApplicationError> {
641 let stream = self.stream_factory.stream().await?;
642 let settings = self.settings.clone();
643 let logger = self.logger_factory.build(&settings)?;
644
645 let (request_sender, request_receiver) = mpsc::unbounded_channel::<Request>();
646 let (app_message_event_sender, app_message_event_receiver) =
647 mpsc::unbounded_channel::<Arc<MsgBuf>>();
648
649 let begin_string = Arc::clone(&self.settings.begin_string);
650 let typ = self.typ;
651
652 tokio::task::spawn(async move {
653 if let Err(e) = fix::spin_session(
654 stream,
655 request_receiver,
656 app_message_event_sender,
657 settings,
658 typ,
659 logger,
660 )
661 .await
662 {
663 eprintln!("{e:?}");
664 }
665 });
666
667 let handle = EngineHandle {
668 request_sender,
669 begin_string,
670 };
671
672 Ok((handle, app_message_event_receiver))
673 }
674
675 /// Establish a TCP connection and start the FIX engine that will be driven by `runtime`.
676 pub fn start_with_runtime(
677 &mut self,
678 runtime: tokio::runtime::Runtime,
679 ) -> Result<(EngineHandle, mpsc::UnboundedReceiver<Arc<MsgBuf>>), ApplicationError> {
680 let stream = runtime.block_on(self.stream_factory.stream())?;
681 let settings = self.settings.clone();
682 let logger = runtime.block_on(async { self.logger_factory.build(&settings) })?;
683
684 let (request_sender, request_receiver) = mpsc::unbounded_channel::<Request>();
685 let (app_message_event_sender, app_message_event_receiver) =
686 mpsc::unbounded_channel::<Arc<MsgBuf>>();
687
688 let begin_string = Arc::clone(&self.settings.begin_string);
689 let typ = self.typ;
690
691 std::thread::spawn(move || {
692 if let Err(e) = runtime.block_on(fix::spin_session(
693 stream,
694 request_receiver,
695 app_message_event_sender,
696 settings,
697 typ,
698 logger,
699 )) {
700 eprintln!("{e:?}");
701 }
702 });
703
704 let handle = EngineHandle {
705 request_sender,
706 begin_string,
707 };
708
709 Ok((handle, app_message_event_receiver))
710 }
711
712 /// Establish a TCP connection, and a runtime will be created internally to drive the engine.
713 pub fn start_sync(
714 &mut self,
715 ) -> Result<(EngineHandle, mpsc::UnboundedReceiver<Arc<MsgBuf>>), ApplicationError> {
716 let runtime = tokio::runtime::Builder::new_multi_thread()
717 .enable_all()
718 .build()?;
719 self.start_with_runtime(runtime)
720 }
721}