1use crate::{
2 id_manager::IDManager,
3 router::Router,
4 session::Session,
5 types::{ConnectionID, GlobalVars},
6 BanManager, ConfigManager, Connection, Result, SessionList,
7};
8use std::sync::Arc;
9use tokio::time::{sleep, Duration, Instant};
10use tokio_util::sync::CancellationToken;
11use tracing::{enabled, error, trace, warn, Level};
12
13pub(crate) struct Handler<State, CState>
16where
17 CState: Send + Sync + Clone + 'static,
18{
19 pub(crate) id: ConnectionID,
21 pub(crate) ban_manager: BanManager,
22 pub(crate) id_manager: IDManager,
23 pub(crate) session_list: SessionList<CState>,
24 pub(crate) config_manager: ConfigManager,
25
26 pub(crate) router: Arc<Router<State, CState>>,
28 pub(crate) state: State,
29 pub(crate) connection_state: CState,
30
31 pub(crate) connection: Connection,
33 pub(crate) cancel_token: CancellationToken,
34 pub(crate) global_vars: GlobalVars,
35}
36
37impl<State: Clone + Send + Sync + 'static, CState: Default + Clone + Send + Sync + 'static>
38 Handler<State, CState>
39{
40 pub(crate) async fn run(mut self) -> Result<()> {
41 let address = if self.config_manager.proxy_protocol() {
42 self.connection.proxy_protocol().await?
43 } else {
44 self.connection.address
45 };
46
47 if self.config_manager.ban_manager_enabled() {
48 self.ban_manager.check_banned(address)?;
49 }
50
51 let (mut reader, tx, handle) = self.connection.init();
52
53 let session_id = self.id_manager.allocate_session_id()?;
54
55 let session_cancel_token = self.cancel_token.child_token();
56
57 let session = Session::new(
58 self.id.clone(),
59 session_id,
60 address,
61 tx,
62 self.config_manager.clone(),
63 session_cancel_token.clone(),
64 self.connection_state,
65 )?;
66
67 trace!(
68 id = ?self.id,
69 ip = &address.to_string(),
70 "Connection initialized",
71 );
72
73 self.session_list.add_miner(address, session.clone());
74
75 let sleep = sleep(Duration::from_secs(
76 self.config_manager.connection_config().inital_timeout,
77 ));
78 tokio::pin!(sleep);
79
80 while !self.cancel_token.is_cancelled() {
83 if session.is_disconnected() {
84 trace!( id = ?self.id, ip = &address.to_string(), "Session disconnected.");
85 break;
86 }
87
88 let maybe_frame = tokio::select! {
89 res = reader.read_frame() => {
90 match res {
91 Err(e) => {
92 warn!(ip = session.ip().to_string(), "Session: {} errored with the following error: {}", session.id(), e);
93 break;
94 },
95 Ok(frame) => frame,
96 }
97 },
98 () = &mut sleep => {
99 if enabled!(Level::DEBUG) {
100 error!( id = &self.id.to_string(), ip = &address.to_string(), "Session Parse Frame Timeout");
101 }
102 break;
103 },
104 () = session_cancel_token.cancelled() => {
107 if enabled!(Level::DEBUG) {
109 error!( id = &self.id.to_string(), ip = &address.to_string(), "Session Disconnected");
110 }
111 break;
112 },
113 () = self.cancel_token.cancelled() => {
114 break;
117 }
118 };
119
120 let Some(frame) = maybe_frame else {
121 break;
122 };
123
124 session.active();
126
127 self.router
131 .call(
132 frame,
133 self.state.clone(),
134 session.clone(),
136 self.global_vars.clone(),
137 )
138 .await;
139
140 sleep.as_mut().reset(Instant::now() + session.timeout());
142 }
143
144 trace!(
145 id = &self.id.to_string(),
146 ip = &address.to_string(),
147 "Connection shutdown started",
148 );
149
150 self.session_list.remove_miner(address);
151 self.id_manager.remove_session_id(session_id);
152
153 if session.needs_ban() {
154 self.ban_manager.add_ban(address);
155 }
156
157 session.shutdown();
158
159 self.cancel_token.cancel();
160
161 if let Err(e) = handle.await {
163 trace!(id = ?self.id, cause = ?e, "Write loop error");
164 }
165
166 trace!(
167 id = ?self.id,
168 ip = &address.to_string(),
169 "Connection shutdown complete",
170 );
171
172 Ok(())
173 }
174}
175
176