stratum_server/
session.rs

1use crate::{
2    config::ConfigManager,
3    types::{ConnectionID, Difficulties, Difficulty, DifficultySettings},
4    Miner, MinerList, Result, SessionID,
5};
6use extended_primitives::Buffer;
7use parking_lot::{Mutex, RwLock};
8use serde::Serialize;
9use std::{
10    fmt::Display,
11    net::SocketAddr,
12    sync::Arc,
13    time::{Duration, Instant, SystemTime},
14};
15use tokio::sync::mpsc::UnboundedSender;
16use tokio_util::sync::CancellationToken;
17use tracing::{debug, error};
18use uuid::Uuid;
19
20//@todo remove this excessive_bools
21#[allow(clippy::struct_excessive_bools)]
22#[derive(Debug, Clone)]
23pub struct SessionInfo {
24    pub agent: bool,
25    pub authorized: bool,
26    pub subscribed: bool,
27    pub client: Option<String>,
28    pub session_start: SystemTime,
29    // pub state: SessionState,
30    pub is_long_timeout: bool,
31}
32
33impl Default for SessionInfo {
34    fn default() -> Self {
35        Self::new()
36    }
37}
38
39impl SessionInfo {
40    pub fn new() -> Self {
41        SessionInfo {
42            agent: false,
43            authorized: false,
44            subscribed: false,
45            client: None,
46            session_start: SystemTime::now(),
47            // state: SessionState::Connected,
48            is_long_timeout: false,
49        }
50    }
51}
52
53#[derive(PartialEq, Eq, Debug)]
54pub enum SessionState {
55    Connected,
56    Disconnected,
57}
58
59#[derive(Debug)]
60pub enum SendInformation {
61    Json(String),
62    Text(String),
63    Raw(Buffer),
64}
65
66impl Display for SendInformation {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        match self {
69            SendInformation::Json(s) => {
70                write!(f, "{}", s)
71            }
72            SendInformation::Text(s) => {
73                write!(f, "{}", s)
74            }
75            SendInformation::Raw(b) => {
76                write!(f, "{}", b)
77            }
78        }
79    }
80}
81
82//@todo thought process -> Rather than have this boolean variables that slowly add up over time, we
83//should add a new type of "SessionType". This will allow us to also incorporate other types of
84//connections that are developed in the future or that are already here and enables a lot easier
85//pattern matching imo.
86
87//@todo also think about these enums for connectgion sttatus like authenticated/subscribed etc.
88#[derive(Clone)]
89pub struct Session<State> {
90    inner: Arc<Inner<State>>,
91    config_manager: ConfigManager,
92
93    cancel_token: CancellationToken,
94    miner_list: MinerList,
95    shared: Arc<Mutex<Shared>>,
96    difficulty_settings: Arc<RwLock<DifficultySettings>>,
97}
98
99struct Inner<State> {
100    pub id: ConnectionID,
101    pub session_id: SessionID,
102    pub ip: SocketAddr,
103    pub state: State,
104}
105
106//@todo I think we need to have a few more Mutex's here otherwise we run the risk of deadlocks.
107pub(crate) struct Shared {
108    //@todo possibly turn this into an Atomic
109    status: SessionState,
110    //@todo change this (But Later)
111    sender: UnboundedSender<SendInformation>,
112    needs_ban: bool,
113    last_active: Instant,
114    //@todo wrap this in a RwLock I believe
115    info: SessionInfo,
116}
117
118impl<State: Clone> Session<State> {
119    pub fn new(
120        id: ConnectionID,
121        session_id: SessionID,
122        ip: SocketAddr,
123        sender: UnboundedSender<SendInformation>,
124        config_manager: ConfigManager,
125        cancel_token: CancellationToken,
126        state: State,
127    ) -> Result<Self> {
128        let config = config_manager.current_config();
129
130        let shared = Shared {
131            status: SessionState::Connected,
132            last_active: Instant::now(),
133            needs_ban: false,
134            sender,
135            info: SessionInfo::new(),
136        };
137
138        let inner = Inner {
139            id,
140            session_id,
141            ip,
142            state,
143        };
144
145        Ok(Session {
146            config_manager,
147            cancel_token,
148            miner_list: MinerList::new(),
149            shared: Arc::new(Mutex::new(shared)),
150            inner: Arc::new(inner),
151            difficulty_settings: Arc::new(RwLock::new(DifficultySettings {
152                default: Difficulty::from(config.difficulty.initial_difficulty),
153                minimum: Difficulty::from(config.difficulty.minimum_difficulty),
154            })),
155        })
156    }
157
158    #[must_use]
159    pub fn is_disconnected(&self) -> bool {
160        self.shared.lock().status == SessionState::Disconnected
161    }
162
163    pub fn send<T: Serialize>(&self, message: T) -> Result<()> {
164        let shared = self.shared.lock();
165
166        if shared.last_active.elapsed()
167            > Duration::from_secs(
168                self.config_manager
169                    .current_config()
170                    .connection
171                    .active_timeout,
172            )
173        {
174            error!(
175                "Session: {} not active for 10 minutes. Disconnecting",
176                self.inner.id,
177            );
178            drop(shared);
179
180            self.ban();
181
182            //@todo return Error here instead
183            return Ok(());
184        }
185
186        let msg = SendInformation::Json(serde_json::to_string(&message)?);
187
188        debug!("Sending message: {}", msg);
189
190        //@todo it may make sense to keep the sender inside of session here - not sure why it's in
191        //connection like the way it is.
192        //@todo this feels inefficient, maybe we do send bytes here.
193        shared.sender.send(msg)?;
194
195        Ok(())
196    }
197
198    pub fn send_raw(&self, message: Buffer) -> Result<()> {
199        let shared = self.shared.lock();
200
201        shared.sender.send(SendInformation::Raw(message))?;
202
203        Ok(())
204    }
205
206    pub fn shutdown(&self) {
207        if !self.is_disconnected() {
208            self.disconnect();
209
210            self.cancel_token.cancel();
211        }
212    }
213
214    pub fn disconnect(&self) {
215        self.shared.lock().status = SessionState::Disconnected;
216    }
217
218    pub fn ban(&self) {
219        self.shared.lock().needs_ban = true;
220        self.shutdown();
221    }
222
223    #[must_use]
224    pub fn needs_ban(&self) -> bool {
225        self.shared.lock().needs_ban
226    }
227
228    #[must_use]
229    pub fn id(&self) -> &ConnectionID {
230        &self.inner.id
231    }
232
233    pub fn register_worker(
234        &self,
235        session_id: SessionID,
236        client: Option<String>,
237        worker_name: Option<String>,
238        worker_id: Uuid,
239    ) {
240        //@todo has to be an easier way to reuse worker_name here
241        debug!(id = ?self.inner.id, "Registered Worker {worker_id} ({}) Session ID: {session_id}", worker_name.clone().unwrap_or_default());
242
243        let worker = Miner::new(
244            self.id().clone(),
245            worker_id,
246            session_id,
247            client,
248            worker_name,
249            self.config_manager.clone(),
250            self.difficulty_settings.read().clone(),
251        );
252
253        self.miner_list.add_miner(session_id, worker);
254    }
255
256    #[must_use]
257    pub fn unregister_worker(&self, session_id: SessionID) -> Option<(SessionID, Miner)> {
258        self.miner_list.remove_miner(session_id)
259    }
260
261    #[must_use]
262    pub fn get_miner_list(&self) -> MinerList {
263        self.miner_list.clone()
264    }
265
266    #[must_use]
267    pub fn get_worker_by_session_id(&self, session_id: SessionID) -> Option<Miner> {
268        self.miner_list.get_miner_by_id(session_id)
269    }
270
271    //@todo I think we can remove this
272    pub fn update_worker_by_session_id(&self, session_id: SessionID, miner: Miner) {
273        self.miner_list
274            .update_miner_by_session_id(session_id, miner);
275    }
276
277    // ===== Worker Helper functions ===== //
278
279    pub fn set_client(&self, client: &str) {
280        let mut agent = false;
281        let mut long_timeout = false;
282        //@todo check these equal to just STATIC CONSTS for the various things we need to know.
283        //ClientType Enum
284        //@todo we need to do some checking/pruning etc of this client string.
285
286        if client.starts_with("btccom-agent/") {
287            //Agent
288            agent = true;
289            long_timeout = true;
290        }
291
292        let mut shared = self.shared.lock();
293        shared.info.agent = agent;
294        shared.info.client = Some(client.to_string());
295        shared.info.is_long_timeout = long_timeout;
296    }
297
298    #[must_use]
299    pub fn get_connection_info(&self) -> SessionInfo {
300        self.shared.lock().info.clone()
301    }
302
303    #[must_use]
304    pub fn is_long_timeout(&self) -> bool {
305        self.shared.lock().info.is_long_timeout
306    }
307
308    // Returns the current timeout
309    #[must_use]
310    pub fn timeout(&self) -> Duration {
311        let shared = self.shared.lock();
312
313        if shared.info.is_long_timeout {
314            // One Week
315            Duration::from_secs(86400 * 7)
316        } else if shared.info.subscribed && shared.info.authorized {
317            // Ten Minutes
318            Duration::from_secs(600)
319        } else {
320            //@todo let's play with this -> I think 15 might be too short, but if it works lets do
321            //it.
322            Duration::from_secs(15)
323        }
324    }
325
326    #[must_use]
327    pub fn get_session_id(&self) -> SessionID {
328        self.inner.session_id
329    }
330
331    #[must_use]
332    pub fn authorized(&self) -> bool {
333        self.shared.lock().info.authorized
334    }
335
336    pub fn authorize(&self) {
337        self.shared.lock().info.authorized = true;
338    }
339
340    #[must_use]
341    pub fn subscribed(&self) -> bool {
342        self.shared.lock().info.subscribed
343    }
344
345    pub fn subscribe(&self) {
346        self.shared.lock().info.subscribed = true;
347    }
348
349    #[must_use]
350    pub fn is_agent(&self) -> bool {
351        self.shared.lock().info.agent
352    }
353
354    pub fn set_difficulty(&self, session_id: SessionID, difficulty: Difficulty) {
355        if let Some(miner) = self.miner_list.get_miner_by_id(session_id) {
356            miner.set_difficulty(difficulty);
357        }
358    }
359
360    pub fn set_default_difficulty(&self, difficulty: Difficulty) {
361        self.difficulty_settings.write().default = difficulty;
362    }
363
364    //@todo we need to test this
365    pub fn set_minimum_difficulty(&self, difficulty: Difficulty) {
366        //We only want to set the minimum difficulty if it is greater than or equal to the Global
367        //minimum difficulty
368        if difficulty.as_u64() >= self.config_manager.difficulty_config().minimum_difficulty {
369            self.difficulty_settings.write().minimum = difficulty;
370        }
371    }
372
373    #[must_use]
374    pub fn get_difficulties(&self, session_id: SessionID) -> Option<Difficulties> {
375        self.miner_list
376            .get_miner_by_id(session_id)
377            .map(|miner| miner.difficulties())
378    }
379
380    //@todo double check that a reference here will work
381    #[must_use]
382    pub fn state(&self) -> &State {
383        &self.inner.state
384    }
385
386    #[must_use]
387    pub fn update_difficulty(&self, session_id: SessionID) -> Option<Difficulty> {
388        if let Some(miner) = self.miner_list.get_miner_by_id(session_id) {
389            miner.update_difficulty()
390        } else {
391            None
392        }
393    }
394
395    pub(crate) fn active(&self) {
396        self.shared.lock().last_active = Instant::now();
397    }
398
399    #[must_use]
400    pub fn ip(&self) -> SocketAddr {
401        self.inner.ip
402    }
403
404    //@todo Use internal URLs if they exist otherwise use the default URL?
405    //Figure out how to do this gracefully.
406    // pub(crate) fn graceful_shutdown(&self) {
407    //     todo!()
408    // }
409}
410
411#[cfg(feature = "test-utils")]
412impl<State: Clone> Session<State> {
413    pub fn mock(state: State) -> Session<State> {}
414}