1use crate::{
2 config::ConfigManager,
3 types::{ConnectionID, Difficulties, Difficulty, DifficultySettings},
4 Miner, MinerList, Result, SessionID,
5};
6use extended_primitives::Buffer;
7use parking_lot::{Mutex, RwLock};
8use serde::Serialize;
9use std::{
10 fmt::Display,
11 net::SocketAddr,
12 sync::Arc,
13 time::{Duration, Instant, SystemTime},
14};
15use tokio::sync::mpsc::UnboundedSender;
16use tokio_util::sync::CancellationToken;
17use tracing::{debug, error};
18use uuid::Uuid;
19
20#[allow(clippy::struct_excessive_bools)]
22#[derive(Debug, Clone)]
23pub struct SessionInfo {
24 pub agent: bool,
25 pub authorized: bool,
26 pub subscribed: bool,
27 pub client: Option<String>,
28 pub session_start: SystemTime,
29 pub is_long_timeout: bool,
31}
32
33impl Default for SessionInfo {
34 fn default() -> Self {
35 Self::new()
36 }
37}
38
39impl SessionInfo {
40 pub fn new() -> Self {
41 SessionInfo {
42 agent: false,
43 authorized: false,
44 subscribed: false,
45 client: None,
46 session_start: SystemTime::now(),
47 is_long_timeout: false,
49 }
50 }
51}
52
53#[derive(PartialEq, Eq, Debug)]
54pub enum SessionState {
55 Connected,
56 Disconnected,
57}
58
59#[derive(Debug)]
60pub enum SendInformation {
61 Json(String),
62 Text(String),
63 Raw(Buffer),
64}
65
66impl Display for SendInformation {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 match self {
69 SendInformation::Json(s) => {
70 write!(f, "{}", s)
71 }
72 SendInformation::Text(s) => {
73 write!(f, "{}", s)
74 }
75 SendInformation::Raw(b) => {
76 write!(f, "{}", b)
77 }
78 }
79 }
80}
81
82#[derive(Clone)]
89pub struct Session<State> {
90 inner: Arc<Inner<State>>,
91 config_manager: ConfigManager,
92
93 cancel_token: CancellationToken,
94 miner_list: MinerList,
95 shared: Arc<Mutex<Shared>>,
96 difficulty_settings: Arc<RwLock<DifficultySettings>>,
97}
98
99struct Inner<State> {
100 pub id: ConnectionID,
101 pub session_id: SessionID,
102 pub ip: SocketAddr,
103 pub state: State,
104}
105
106pub(crate) struct Shared {
108 status: SessionState,
110 sender: UnboundedSender<SendInformation>,
112 needs_ban: bool,
113 last_active: Instant,
114 info: SessionInfo,
116}
117
118impl<State: Clone> Session<State> {
119 pub fn new(
120 id: ConnectionID,
121 session_id: SessionID,
122 ip: SocketAddr,
123 sender: UnboundedSender<SendInformation>,
124 config_manager: ConfigManager,
125 cancel_token: CancellationToken,
126 state: State,
127 ) -> Result<Self> {
128 let config = config_manager.current_config();
129
130 let shared = Shared {
131 status: SessionState::Connected,
132 last_active: Instant::now(),
133 needs_ban: false,
134 sender,
135 info: SessionInfo::new(),
136 };
137
138 let inner = Inner {
139 id,
140 session_id,
141 ip,
142 state,
143 };
144
145 Ok(Session {
146 config_manager,
147 cancel_token,
148 miner_list: MinerList::new(),
149 shared: Arc::new(Mutex::new(shared)),
150 inner: Arc::new(inner),
151 difficulty_settings: Arc::new(RwLock::new(DifficultySettings {
152 default: Difficulty::from(config.difficulty.initial_difficulty),
153 minimum: Difficulty::from(config.difficulty.minimum_difficulty),
154 })),
155 })
156 }
157
158 #[must_use]
159 pub fn is_disconnected(&self) -> bool {
160 self.shared.lock().status == SessionState::Disconnected
161 }
162
163 pub fn send<T: Serialize>(&self, message: T) -> Result<()> {
164 let shared = self.shared.lock();
165
166 if shared.last_active.elapsed()
167 > Duration::from_secs(
168 self.config_manager
169 .current_config()
170 .connection
171 .active_timeout,
172 )
173 {
174 error!(
175 "Session: {} not active for 10 minutes. Disconnecting",
176 self.inner.id,
177 );
178 drop(shared);
179
180 self.ban();
181
182 return Ok(());
184 }
185
186 let msg = SendInformation::Json(serde_json::to_string(&message)?);
187
188 debug!("Sending message: {}", msg);
189
190 shared.sender.send(msg)?;
194
195 Ok(())
196 }
197
198 pub fn send_raw(&self, message: Buffer) -> Result<()> {
199 let shared = self.shared.lock();
200
201 shared.sender.send(SendInformation::Raw(message))?;
202
203 Ok(())
204 }
205
206 pub fn shutdown(&self) {
207 if !self.is_disconnected() {
208 self.disconnect();
209
210 self.cancel_token.cancel();
211 }
212 }
213
214 pub fn disconnect(&self) {
215 self.shared.lock().status = SessionState::Disconnected;
216 }
217
218 pub fn ban(&self) {
219 self.shared.lock().needs_ban = true;
220 self.shutdown();
221 }
222
223 #[must_use]
224 pub fn needs_ban(&self) -> bool {
225 self.shared.lock().needs_ban
226 }
227
228 #[must_use]
229 pub fn id(&self) -> &ConnectionID {
230 &self.inner.id
231 }
232
233 pub fn register_worker(
234 &self,
235 session_id: SessionID,
236 client: Option<String>,
237 worker_name: Option<String>,
238 worker_id: Uuid,
239 ) {
240 debug!(id = ?self.inner.id, "Registered Worker {worker_id} ({}) Session ID: {session_id}", worker_name.clone().unwrap_or_default());
242
243 let worker = Miner::new(
244 self.id().clone(),
245 worker_id,
246 session_id,
247 client,
248 worker_name,
249 self.config_manager.clone(),
250 self.difficulty_settings.read().clone(),
251 );
252
253 self.miner_list.add_miner(session_id, worker);
254 }
255
256 #[must_use]
257 pub fn unregister_worker(&self, session_id: SessionID) -> Option<(SessionID, Miner)> {
258 self.miner_list.remove_miner(session_id)
259 }
260
261 #[must_use]
262 pub fn get_miner_list(&self) -> MinerList {
263 self.miner_list.clone()
264 }
265
266 #[must_use]
267 pub fn get_worker_by_session_id(&self, session_id: SessionID) -> Option<Miner> {
268 self.miner_list.get_miner_by_id(session_id)
269 }
270
271 pub fn update_worker_by_session_id(&self, session_id: SessionID, miner: Miner) {
273 self.miner_list
274 .update_miner_by_session_id(session_id, miner);
275 }
276
277 pub fn set_client(&self, client: &str) {
280 let mut agent = false;
281 let mut long_timeout = false;
282 if client.starts_with("btccom-agent/") {
287 agent = true;
289 long_timeout = true;
290 }
291
292 let mut shared = self.shared.lock();
293 shared.info.agent = agent;
294 shared.info.client = Some(client.to_string());
295 shared.info.is_long_timeout = long_timeout;
296 }
297
298 #[must_use]
299 pub fn get_connection_info(&self) -> SessionInfo {
300 self.shared.lock().info.clone()
301 }
302
303 #[must_use]
304 pub fn is_long_timeout(&self) -> bool {
305 self.shared.lock().info.is_long_timeout
306 }
307
308 #[must_use]
310 pub fn timeout(&self) -> Duration {
311 let shared = self.shared.lock();
312
313 if shared.info.is_long_timeout {
314 Duration::from_secs(86400 * 7)
316 } else if shared.info.subscribed && shared.info.authorized {
317 Duration::from_secs(600)
319 } else {
320 Duration::from_secs(15)
323 }
324 }
325
326 #[must_use]
327 pub fn get_session_id(&self) -> SessionID {
328 self.inner.session_id
329 }
330
331 #[must_use]
332 pub fn authorized(&self) -> bool {
333 self.shared.lock().info.authorized
334 }
335
336 pub fn authorize(&self) {
337 self.shared.lock().info.authorized = true;
338 }
339
340 #[must_use]
341 pub fn subscribed(&self) -> bool {
342 self.shared.lock().info.subscribed
343 }
344
345 pub fn subscribe(&self) {
346 self.shared.lock().info.subscribed = true;
347 }
348
349 #[must_use]
350 pub fn is_agent(&self) -> bool {
351 self.shared.lock().info.agent
352 }
353
354 pub fn set_difficulty(&self, session_id: SessionID, difficulty: Difficulty) {
355 if let Some(miner) = self.miner_list.get_miner_by_id(session_id) {
356 miner.set_difficulty(difficulty);
357 }
358 }
359
360 pub fn set_default_difficulty(&self, difficulty: Difficulty) {
361 self.difficulty_settings.write().default = difficulty;
362 }
363
364 pub fn set_minimum_difficulty(&self, difficulty: Difficulty) {
366 if difficulty.as_u64() >= self.config_manager.difficulty_config().minimum_difficulty {
369 self.difficulty_settings.write().minimum = difficulty;
370 }
371 }
372
373 #[must_use]
374 pub fn get_difficulties(&self, session_id: SessionID) -> Option<Difficulties> {
375 self.miner_list
376 .get_miner_by_id(session_id)
377 .map(|miner| miner.difficulties())
378 }
379
380 #[must_use]
382 pub fn state(&self) -> &State {
383 &self.inner.state
384 }
385
386 #[must_use]
387 pub fn update_difficulty(&self, session_id: SessionID) -> Option<Difficulty> {
388 if let Some(miner) = self.miner_list.get_miner_by_id(session_id) {
389 miner.update_difficulty()
390 } else {
391 None
392 }
393 }
394
395 pub(crate) fn active(&self) {
396 self.shared.lock().last_active = Instant::now();
397 }
398
399 #[must_use]
400 pub fn ip(&self) -> SocketAddr {
401 self.inner.ip
402 }
403
404 }
410
411#[cfg(feature = "test-utils")]
412impl<State: Clone> Session<State> {
413 pub fn mock(state: State) -> Session<State> {}
414}