use std::{cell::RefCell, collections::HashMap, net::SocketAddr, rc::Rc};
use pin_project::pin_project;
use tokio::net::TcpStream;
use tracing::{Instrument, info, info_span};
use crate::{
Error,
application::{Emitter, EventStream, events_channel},
io::initiator_connection,
messages_storage::MessagesStorage,
session::Session,
session_id::SessionId,
session_state::State,
settings::{SessionSettings, Settings},
};
pub(crate) type ActiveSessionsMap<S> = HashMap<SessionId, Rc<Session<S>>>;
#[pin_project]
pub struct Initiator<S: MessagesStorage> {
id: SessionId,
settings: Settings,
session_settings: SessionSettings,
state: Rc<RefCell<State<S>>>,
active_sessions: Rc<RefCell<ActiveSessionsMap<S>>>,
emitter: Emitter,
#[pin]
event_stream: EventStream,
}
impl<S: MessagesStorage + 'static> Initiator<S> {
pub fn new(
settings: Settings,
session_settings: SessionSettings,
messages_storage: S,
) -> Initiator<S> {
let (emitter, event_stream) = events_channel();
Initiator {
id: session_settings.session_id.clone(),
settings,
session_settings,
state: Rc::new(RefCell::new(State::new(messages_storage))),
active_sessions: Rc::new(RefCell::new(HashMap::new())),
emitter,
event_stream,
}
}
pub async fn connect(&self, socket_addr: impl Into<SocketAddr>) -> Result<(), Error> {
info!("Initiator started");
let addr = socket_addr.into();
let tcp_stream = TcpStream::connect(addr).await?;
tcp_stream.set_nodelay(true)?;
let emitter = self.emitter.clone();
let settings = self.settings.clone();
let session_settings = self.session_settings.clone();
let active_sessions = self.active_sessions.clone();
let state = self.state.clone();
let connection_span = info_span!("connection", %addr);
tokio::task::spawn_local(async move {
initiator_connection(
tcp_stream,
settings,
session_settings,
state,
active_sessions,
emitter,
)
.instrument(connection_span.clone())
.await;
connection_span.in_scope(|| {
info!("Connection closed");
});
});
Ok(())
}
}