easyfix_session/
initiator.rs

1use std::{cell::RefCell, collections::HashMap, net::SocketAddr, rc::Rc};
2
3use pin_project::pin_project;
4use tokio::net::TcpStream;
5use tracing::{Instrument, info, info_span};
6
7use crate::{
8    Error,
9    application::{Emitter, EventStream, events_channel},
10    io::initiator_connection,
11    messages_storage::MessagesStorage,
12    session::Session,
13    session_id::SessionId,
14    session_state::State,
15    settings::{SessionSettings, Settings},
16};
17
18// TODO: Same as in Acceptor, not need for duplicate
19pub(crate) type ActiveSessionsMap<S> = HashMap<SessionId, Rc<Session<S>>>;
20
21#[pin_project]
22pub struct Initiator<S: MessagesStorage> {
23    id: SessionId,
24    settings: Settings,
25    session_settings: SessionSettings,
26    state: Rc<RefCell<State<S>>>,
27    active_sessions: Rc<RefCell<ActiveSessionsMap<S>>>,
28    emitter: Emitter,
29    #[pin]
30    event_stream: EventStream,
31}
32
33impl<S: MessagesStorage + 'static> Initiator<S> {
34    pub fn new(
35        settings: Settings,
36        session_settings: SessionSettings,
37        messages_storage: S,
38    ) -> Initiator<S> {
39        let (emitter, event_stream) = events_channel();
40        Initiator {
41            id: session_settings.session_id.clone(),
42            settings,
43            session_settings,
44            state: Rc::new(RefCell::new(State::new(messages_storage))),
45            active_sessions: Rc::new(RefCell::new(HashMap::new())),
46            emitter,
47            event_stream,
48        }
49    }
50
51    pub async fn connect(&self, socket_addr: impl Into<SocketAddr>) -> Result<(), Error> {
52        info!("Initiator started");
53
54        let addr = socket_addr.into();
55        let tcp_stream = TcpStream::connect(addr).await?;
56        tcp_stream.set_nodelay(true)?;
57        let emitter = self.emitter.clone();
58        let settings = self.settings.clone();
59        let session_settings = self.session_settings.clone();
60        let active_sessions = self.active_sessions.clone();
61        let state = self.state.clone();
62
63        let connection_span = info_span!("connection", %addr);
64
65        tokio::task::spawn_local(async move {
66            initiator_connection(
67                tcp_stream,
68                settings,
69                session_settings,
70                state,
71                active_sessions,
72                emitter,
73            )
74            .instrument(connection_span.clone())
75            .await;
76            connection_span.in_scope(|| {
77                info!("Connection closed");
78            });
79        });
80        Ok(())
81    }
82}