nym_gateway_stats_storage/
lib.rs

1// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
2// SPDX-License-Identifier: GPL-3.0-only
3
4use error::StatsStorageError;
5use models::StoredFinishedSession;
6use nym_node_metrics::entry::{ActiveSession, FinishedSession};
7use nym_sphinx::DestinationAddressBytes;
8use nym_statistics_common::types::SessionType;
9use sessions::SessionManager;
10use sqlx::{
11    ConnectOptions,
12    sqlite::{SqliteAutoVacuum, SqliteSynchronous},
13};
14use std::path::Path;
15use time::Date;
16use tracing::{debug, error};
17
18pub mod error;
19pub mod models;
20mod sessions;
21
22// note that clone here is fine as upon cloning the same underlying pool will be used
23#[derive(Clone)]
24pub struct PersistentStatsStorage {
25    session_manager: SessionManager,
26}
27
28impl PersistentStatsStorage {
29    /// Initialises `PersistentStatsStorage` using the provided path.
30    ///
31    /// # Arguments
32    ///
33    /// * `database_path`: path to the database.
34    pub async fn init<P: AsRef<Path> + Send>(database_path: P) -> Result<Self, StatsStorageError> {
35        debug!(
36            "Attempting to connect to database {}",
37            database_path.as_ref().display()
38        );
39
40        // TODO: we can inject here more stuff based on our gateway global config
41        // struct. Maybe different pool size or timeout intervals?
42        let opts = sqlx::sqlite::SqliteConnectOptions::new()
43            .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
44            .synchronous(SqliteSynchronous::Normal)
45            .auto_vacuum(SqliteAutoVacuum::Incremental)
46            .filename(database_path)
47            .create_if_missing(true)
48            .disable_statement_logging();
49
50        // TODO: do we want auto_vacuum ?
51
52        let connection_pool = match sqlx::SqlitePool::connect_with(opts).await {
53            Ok(db) => db,
54            Err(err) => {
55                error!("Failed to connect to SQLx database: {err}");
56                return Err(err.into());
57            }
58        };
59
60        if let Err(err) = sqlx::migrate!("./migrations").run(&connection_pool).await {
61            error!("Failed to perform migration on the SQLx database: {err}");
62            return Err(err.into());
63        }
64
65        // the cloning here are cheap as connection pool is stored behind an Arc
66        Ok(PersistentStatsStorage {
67            session_manager: sessions::SessionManager::new(connection_pool),
68        })
69    }
70
71    //Sessions fn
72    pub async fn insert_finished_session(
73        &self,
74        date: Date,
75        session: FinishedSession,
76    ) -> Result<(), StatsStorageError> {
77        Ok(self
78            .session_manager
79            .insert_finished_session(
80                date,
81                session.duration.as_millis() as i64,
82                session.typ.to_string(),
83            )
84            .await?)
85    }
86
87    pub async fn get_finished_sessions(
88        &self,
89        date: Date,
90    ) -> Result<Vec<StoredFinishedSession>, StatsStorageError> {
91        Ok(self.session_manager.get_finished_sessions(date).await?)
92    }
93
94    pub async fn delete_finished_sessions(
95        &self,
96        before_date: Date,
97    ) -> Result<(), StatsStorageError> {
98        Ok(self
99            .session_manager
100            .delete_finished_sessions(before_date)
101            .await?)
102    }
103
104    pub async fn insert_unique_user(
105        &self,
106        date: Date,
107        client_address_bs58: String,
108    ) -> Result<(), StatsStorageError> {
109        Ok(self
110            .session_manager
111            .insert_unique_user(date, client_address_bs58)
112            .await?)
113    }
114
115    pub async fn get_unique_users(&self, date: Date) -> Result<Vec<String>, StatsStorageError> {
116        Ok(self.session_manager.get_unique_users(date).await?)
117    }
118
119    pub async fn delete_unique_users(&self, before_date: Date) -> Result<(), StatsStorageError> {
120        Ok(self
121            .session_manager
122            .delete_unique_users(before_date)
123            .await?)
124    }
125
126    pub async fn delete_unique_user(
127        &self,
128        client_address: DestinationAddressBytes,
129    ) -> Result<(), StatsStorageError> {
130        Ok(self
131            .session_manager
132            .delete_unique_user(client_address.as_base58_string())
133            .await?)
134    }
135
136    pub async fn insert_active_session(
137        &self,
138        client_address: DestinationAddressBytes,
139        session: ActiveSession,
140    ) -> Result<(), StatsStorageError> {
141        Ok(self
142            .session_manager
143            .insert_active_session(
144                client_address.as_base58_string(),
145                session.start,
146                session.typ.to_string(),
147            )
148            .await?)
149    }
150
151    pub async fn remember_active_session(
152        &self,
153        client_address: DestinationAddressBytes,
154    ) -> Result<(), StatsStorageError> {
155        Ok(self
156            .session_manager
157            .remember_active_session(client_address.as_base58_string())
158            .await?)
159    }
160
161    pub async fn update_active_session_type(
162        &self,
163        client_address: DestinationAddressBytes,
164        session_type: SessionType,
165    ) -> Result<(), StatsStorageError> {
166        Ok(self
167            .session_manager
168            .update_active_session_type(client_address.as_base58_string(), session_type.to_string())
169            .await?)
170    }
171
172    pub async fn get_active_session(
173        &self,
174        client_address: DestinationAddressBytes,
175    ) -> Result<Option<ActiveSession>, StatsStorageError> {
176        Ok(self
177            .session_manager
178            .get_active_session(client_address.as_base58_string())
179            .await?
180            .map(Into::into))
181    }
182
183    pub async fn get_all_active_sessions(&self) -> Result<Vec<ActiveSession>, StatsStorageError> {
184        Ok(self
185            .session_manager
186            .get_all_active_sessions()
187            .await?
188            .into_iter()
189            .map(Into::into)
190            .collect())
191    }
192
193    pub async fn get_started_sessions_count(
194        &self,
195        start_date: Date,
196    ) -> Result<i64, StatsStorageError> {
197        Ok(self
198            .session_manager
199            .get_started_sessions_count(start_date)
200            .await?)
201    }
202
203    pub async fn get_active_users(&self) -> Result<Vec<String>, StatsStorageError> {
204        Ok(self.session_manager.get_active_users().await?)
205    }
206
207    pub async fn delete_active_session(
208        &self,
209        client_address: DestinationAddressBytes,
210    ) -> Result<(), StatsStorageError> {
211        Ok(self
212            .session_manager
213            .delete_active_session(client_address.as_base58_string())
214            .await?)
215    }
216
217    pub async fn cleanup_active_sessions(&self) -> Result<(), StatsStorageError> {
218        Ok(self.session_manager.cleanup_active_sessions().await?)
219    }
220}