easyfix_session/
initiator.rs1use std::{cell::RefCell, collections::HashMap, net::SocketAddr, rc::Rc};
2
3use pin_project::pin_project;
4use tokio::net::TcpStream;
5use tracing::{Instrument, info, info_span};
6
7use crate::{
8 Error,
9 application::{Emitter, EventStream, events_channel},
10 io::initiator_connection,
11 messages_storage::MessagesStorage,
12 session::Session,
13 session_id::SessionId,
14 session_state::State,
15 settings::{SessionSettings, Settings},
16};
17
18pub(crate) type ActiveSessionsMap<S> = HashMap<SessionId, Rc<Session<S>>>;
20
21#[pin_project]
22pub struct Initiator<S: MessagesStorage> {
23 id: SessionId,
24 settings: Settings,
25 session_settings: SessionSettings,
26 state: Rc<RefCell<State<S>>>,
27 active_sessions: Rc<RefCell<ActiveSessionsMap<S>>>,
28 emitter: Emitter,
29 #[pin]
30 event_stream: EventStream,
31}
32
33impl<S: MessagesStorage + 'static> Initiator<S> {
34 pub fn new(
35 settings: Settings,
36 session_settings: SessionSettings,
37 messages_storage: S,
38 ) -> Initiator<S> {
39 let (emitter, event_stream) = events_channel();
40 Initiator {
41 id: session_settings.session_id.clone(),
42 settings,
43 session_settings,
44 state: Rc::new(RefCell::new(State::new(messages_storage))),
45 active_sessions: Rc::new(RefCell::new(HashMap::new())),
46 emitter,
47 event_stream,
48 }
49 }
50
51 pub async fn connect(&self, socket_addr: impl Into<SocketAddr>) -> Result<(), Error> {
52 info!("Initiator started");
53
54 let addr = socket_addr.into();
55 let tcp_stream = TcpStream::connect(addr).await?;
56 tcp_stream.set_nodelay(true)?;
57 let emitter = self.emitter.clone();
58 let settings = self.settings.clone();
59 let session_settings = self.session_settings.clone();
60 let active_sessions = self.active_sessions.clone();
61 let state = self.state.clone();
62
63 let connection_span = info_span!("connection", %addr);
64
65 tokio::task::spawn_local(async move {
66 initiator_connection(
67 tcp_stream,
68 settings,
69 session_settings,
70 state,
71 active_sessions,
72 emitter,
73 )
74 .instrument(connection_span.clone())
75 .await;
76 connection_span.in_scope(|| {
77 info!("Connection closed");
78 });
79 });
80 Ok(())
81 }
82}