stratum_server/
tcp.rs

1use crate::{
2    id_manager::IDManager,
3    router::Router,
4    session::Session,
5    types::{ConnectionID, GlobalVars},
6    BanManager, ConfigManager, Connection, Result, SessionList,
7};
8use std::sync::Arc;
9use tokio::time::{sleep, Duration, Instant};
10use tokio_util::sync::CancellationToken;
11use tracing::{enabled, error, trace, warn, Level};
12
13//@todo finish up the logging in this
14
15pub(crate) struct Handler<State, CState>
16where
17    CState: Send + Sync + Clone + 'static,
18{
19    //No Cleanup needed
20    pub(crate) id: ConnectionID,
21    pub(crate) ban_manager: BanManager,
22    pub(crate) id_manager: IDManager,
23    pub(crate) session_list: SessionList<CState>,
24    pub(crate) config_manager: ConfigManager,
25
26    // Not sure, but should test
27    pub(crate) router: Arc<Router<State, CState>>,
28    pub(crate) state: State,
29    pub(crate) connection_state: CState,
30
31    // Cleanup needed
32    pub(crate) connection: Connection,
33    pub(crate) cancel_token: CancellationToken,
34    pub(crate) global_vars: GlobalVars,
35}
36
37impl<State: Clone + Send + Sync + 'static, CState: Default + Clone + Send + Sync + 'static>
38    Handler<State, CState>
39{
40    pub(crate) async fn run(mut self) -> Result<()> {
41        let address = if self.config_manager.proxy_protocol() {
42            self.connection.proxy_protocol().await?
43        } else {
44            self.connection.address
45        };
46
47        if self.config_manager.ban_manager_enabled() {
48            self.ban_manager.check_banned(address)?;
49        }
50
51        let (mut reader, tx, handle) = self.connection.init();
52
53        let session_id = self.id_manager.allocate_session_id()?;
54
55        let session_cancel_token = self.cancel_token.child_token();
56
57        let session = Session::new(
58            self.id.clone(),
59            session_id,
60            address,
61            tx,
62            self.config_manager.clone(),
63            session_cancel_token.clone(),
64            self.connection_state,
65        )?;
66
67        trace!(
68            id = ?self.id,
69            ip = &address.to_string(),
70            "Connection initialized",
71        );
72
73        self.session_list.add_miner(address, session.clone());
74
75        let sleep = sleep(Duration::from_secs(
76            self.config_manager.connection_config().inital_timeout,
77        ));
78        tokio::pin!(sleep);
79
80        //@todo we can return a value from this loop -> break can return a value, and so we may
81        //want to return an error if there is one so that we can report it at the end.
82        while !self.cancel_token.is_cancelled() {
83            if session.is_disconnected() {
84                trace!( id = ?self.id, ip = &address.to_string(), "Session disconnected.");
85                break;
86            }
87
88            let maybe_frame = tokio::select! {
89                    res = reader.read_frame() => {
90                        match res {
91                            Err(e) => {
92                                warn!(ip = session.ip().to_string(), "Session: {} errored with the following error: {}", session.id(), e);
93                                break;
94                            },
95                            Ok(frame) => frame,
96                        }
97                    },
98                        () = &mut sleep => {
99            if enabled!(Level::DEBUG) {
100                error!( id = &self.id.to_string(), ip = &address.to_string(), "Session Parse Frame Timeout");
101            }
102                    break;
103                },
104                        //@todo we might want timeouts to reduce difficulty as well here. -> That is
105                        //handled in retarget, so let's check that out.
106                    () = session_cancel_token.cancelled() => {
107                        //@todo work on these errors,
108            if enabled!(Level::DEBUG) {
109                error!( id = &self.id.to_string(), ip = &address.to_string(), "Session Disconnected");
110            }
111                        break;
112                    },
113                    () = self.cancel_token.cancelled() => {
114                        // If a shutdown signal is received, return from `run`.
115                        // This will result in the task terminating.
116                        break;
117                    }
118                };
119
120            let Some(frame) = maybe_frame else {
121                break;
122            };
123
124            //Resets the Session's last active, to detect for unactive connections
125            session.active();
126
127            //@todo if a miner fails a function, like subscribe / authorize we don't catch it, and
128            //they can spam us.
129            //Calls the Stratum method on the router.
130            self.router
131                .call(
132                    frame,
133                    self.state.clone(),
134                    //@todo would it be possible to pass session by reference?
135                    session.clone(),
136                    self.global_vars.clone(),
137                )
138                .await;
139
140            //Reset sleep as later as possible
141            sleep.as_mut().reset(Instant::now() + session.timeout());
142        }
143
144        trace!(
145            id = &self.id.to_string(),
146            ip = &address.to_string(),
147            "Connection shutdown started",
148        );
149
150        self.session_list.remove_miner(address);
151        self.id_manager.remove_session_id(session_id);
152
153        if session.needs_ban() {
154            self.ban_manager.add_ban(address);
155        }
156
157        session.shutdown();
158
159        self.cancel_token.cancel();
160
161        //@todo we should also have a timeout here - but I may change write loop so we'll see
162        if let Err(e) = handle.await {
163            trace!(id = ?self.id, cause = ?e, "Write loop error");
164        }
165
166        trace!(
167            id = ?self.id,
168            ip = &address.to_string(),
169            "Connection shutdown complete",
170        );
171
172        Ok(())
173    }
174}
175
176//@todo I big think that I think we need to focus on today is catching attacks like open sockets
177//doing nothing, socketrs trying to flood, etc.
178//Let's make sure we have an entire folder of tests for "attacks" and make sure that we cover them
179//thoroughly.